// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/acero/tpch_node.h"

#include <algorithm>
#include <bitset>
#include <cstring>
#include <memory>
#include <mutex>
#include <queue>
#include <random>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "arrow/acero/exec_plan.h"
#include "arrow/acero/query_context.h"
#include "arrow/buffer.h"
#include "arrow/compute/exec.h"
#include "arrow/datum.h"
#include "arrow/util/async_util.h"
#include "arrow/util/formatting.h"
#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/pcg_random.h"
#include "arrow/util/unreachable.h"

namespace arrow {

using arrow::internal::checked_cast;
using arrow::internal::GetRandomSeed;

namespace acero {
namespace internal {
/*
Architecture of the generator:
    This is a multithreaded implementation of TPC-H's DBGen data generator. For each table
    that doesn't depend on any other tables, it gets its own generator. For tables that
    are dependent on each other (namely [ORDERS, LINEITEM] and [PART, PARTSUPP]), we
implement a "double generator". Each generator is given a list of columns to generate.
Columns are then generated lazily a batch at a time. If column A depends on another column
B, column A will ensure that column B is always generated. We don't have to worry about
breaking cycles of dependencies because the dependency graph is acyclic (so there's no
code to even bother breaking cycles). Double generators work by maintaining two output
queues. Batches for each of the tables are generated in sync, and batches that are not
being output immediately are appended to the queue.

    To generate a batch, we grab a lock, increment a counter, check the output queue (if
applicable), and then call the necessary generator functions. To generate a column, we
first check if it's already been generated. If not, we allocate the batch and then fill it
according to the spec.

    There are a few types of columns that get generated:
    - Primary Keys: incrementing counters from 1 to N (with N being the number of rows).
These are generated by incrementing a counter (this counter is gated under a lock).
    - V-String: Random-length string of alphanumerics
    - Phone Number: 2, 3, 3, and 4, digit numbers separated by -'s
    - Random Numbers: random numbers within some range
    - Expressions: expressions based on some other columns
    Please consult the spec for intended behavior of each individual column. Columns are
generated by a function of the same name (so e.g. the column PS_PARTKEY is generated by a
function PS_PARTKEY).
*/

namespace {

const char* NameParts[] = {
    "almond",    "antique",    "aquamarine", "azure",     "beige",    "bisque",
    "black",     "blanched",   "blue",       "blush",     "brown",    "burlywood",
    "burnished", "chartreuse", "chiffon",    "chocolate", "coral",    "cornflower",
    "cornsilk",  "cream",      "cyan",       "dark",      "deep",     "dim",
    "dodger",    "drab",       "firebrick",  "floral",    "forest",   "frosted",
    "gainsboro", "ghost",      "goldenrod",  "green",     "grey",     "honeydew",
    "hot",       "indian",     "ivory",      "khaki",     "lace",     "lavender",
    "lawn",      "lemon",      "light",      "lime",      "linen",    "magenta",
    "maroon",    "medium",     "metallic",   "midnight",  "mint",     "misty",
    "moccasin",  "navajo",     "navy",       "olive",     "orange",   "orchid",
    "pale",      "papaya",     "peach",      "peru",      "pink",     "plum",
    "powder",    "puff",       "purple",     "red",       "rose",     "rosy",
    "royal",     "saddle",     "salmon",     "sandy",     "seashell", "sienna",
    "sky",       "slate",      "smoke",      "snow",      "spring",   "steel",
    "tan",       "thistle",    "tomato",     "turquoise", "violet",   "wheat",
    "white",     "yellow",
};
constexpr size_t kNumNameParts = sizeof(NameParts) / sizeof(NameParts[0]);

const char* Types_1[] = {
    "STANDARD ", "SMALL ", "MEDIUM ", "LARGE ", "ECONOMY ", "PROMO ",
};
constexpr size_t kNumTypes_1 = sizeof(Types_1) / sizeof(Types_1[0]);

const char* Types_2[] = {
    "ANODIZED ", "BURNISHED ", "PLATED ", "POLISHED ", "BRUSHED ",
};
constexpr size_t kNumTypes_2 = sizeof(Types_2) / sizeof(Types_2[0]);

const char* Types_3[] = {
    "TIN", "NICKEL", "BRASS", "STEEL", "COPPER",
};
constexpr size_t kNumTypes_3 = sizeof(Types_3) / sizeof(Types_3[0]);

const char* Containers_1[] = {
    "SM ", "LG ", "MD ", "JUMBO ", "WRAP ",
};
constexpr size_t kNumContainers_1 = sizeof(Containers_1) / sizeof(Containers_1[0]);

const char* Containers_2[] = {
    "CASE", "BOX", "BAG", "JAR", "PKG", "PACK", "CAN", "DRUM",
};
constexpr size_t kNumContainers_2 = sizeof(Containers_2) / sizeof(Containers_2[0]);

const char* Segments[] = {
    "AUTOMOBILE", "BUILDING", "FURNITURE", "MACHINERY", "HOUSEHOLD",
};
constexpr size_t kNumSegments = sizeof(Segments) / sizeof(Segments[0]);

const char* Priorities[] = {
    "1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED", "5-LOW",
};
constexpr size_t kNumPriorities = sizeof(Priorities) / sizeof(Priorities[0]);

const char* Instructions[] = {
    "DELIVER IN PERSON",
    "COLLECT COD",
    "NONE",
    "TAKE BACK RETURN",
};
constexpr size_t kNumInstructions = sizeof(Instructions) / sizeof(Instructions[0]);

const char* Modes[] = {
    "REG AIR", "AIR", "RAIL", "SHIP", "TRUCK", "MAIL", "FOB",
};
constexpr size_t kNumModes = sizeof(Modes) / sizeof(Modes[0]);

const char* Nouns[] = {
    "foxes ",        "ideas ",       "theodolites ",    "pinto beans ", "instructions ",
    "dependencies ", "excuses ",     "platelets ",      "asymptotes ",  "courts ",
    "dolphins ",     "multipliers ", "sautemes ",       "warthogs ",    "frets ",
    "dinos ",        "attainments ", "somas ",          "Tiresias '",   "patterns ",
    "forges ",       "braids ",      "hockey players ", "frays ",       "warhorses ",
    "dugouts ",      "notomis ",     "epitaphs ",       "pearls ",      "tithes ",
    "waters ",       "orbits ",      "gifts ",          "sheaves ",     "depths ",
    "sentiments ",   "decoys ",      "realms ",         "pains ",       "grouches ",
    "escapades ",
};
constexpr size_t kNumNouns = sizeof(Nouns) / sizeof(Nouns[0]);

const char* Verbs[] = {
    "sleep ",  "wake ",    "are ",    "cajole ",    "haggle ",   "nag ",     "use ",
    "boost ",  "affix ",   "detect ", "integrate ", "maintain ", "nod ",     "was ",
    "lose ",   "sublate ", "solve ",  "thrash ",    "promise ",  "engage ",  "hinder ",
    "print ",  "x-ray ",   "breach ", "eat ",       "grow ",     "impress ", "mold ",
    "poach ",  "serve ",   "run ",    "dazzle ",    "snooze ",   "doze ",    "unwind ",
    "kindle ", "play ",    "hang ",   "believe ",   "doubt ",
};
constexpr size_t kNumVerbs = sizeof(Verbs) / sizeof(Verbs[0]);

const char* Adjectives[] = {
    "furious ",  "sly ",       "careful ",  "blithe ", "quick ",  "fluffy ",  "slow ",
    "quiet ",    "ruthless ",  "thin ",     "close ",  "dogged ", "daring ",  "brave ",
    "stealthy ", "permanent ", "enticing ", "idle ",   "busy ",   "regular ", "final ",
    "ironic ",   "even ",      "bold ",     "silent ",
};
constexpr size_t kNumAdjectives = sizeof(Adjectives) / sizeof(Adjectives[0]);

const char* Adverbs[] = {
    "sometimes ",   "always ",     "never ",    "furiously ", "slyly ",     "carefully ",
    "blithely ",    "quickly ",    "fluffily ", "slowly ",    "quietly ",   "ruthlessly ",
    "thinly ",      "closely ",    "doggedly ", "daringly ",  "bravely ",   "stealthily ",
    "permanently ", "enticingly ", "idly ",     "busily ",    "regularly ", "finally ",
    "ironically ",  "evenly ",     "boldly ",   "silently ",
};
constexpr size_t kNumAdverbs = sizeof(Adverbs) / sizeof(Adverbs[0]);

const char* Prepositions[] = {
    "about ",  "above ",        "according to ", "across ",  "after ",      "against ",
    "along ",  "alongside of ", "among ",        "around ",  "at ",         "atop ",
    "before ", "behind ",       "beneath ",      "beside ",  "besides ",    "between ",
    "beyond ", "beyond ",       "by ",           "despite ", "during ",     "except ",
    "for ",    "from ",         "in place of ",  "inside ",  "instead of ", "into ",
    "near ",   "of ",           "on ",           "outside ", "over ",       "past ",
    "since ",  "through ",      "throughout ",   "to ",      "toward ",     "under ",
    "until ",  "up ",           "upon ",         "without ", "with ",       "within ",
};
constexpr size_t kNumPrepositions = sizeof(Prepositions) / sizeof(Prepositions[0]);

const char* Auxiliaries[] = {
    "do ",
    "may ",
    "might ",
    "shall ",
    "will ",
    "would ",
    "can ",
    "could ",
    "should ",
    "ought to ",
    "must ",
    "will have to ",
    "shall have to ",
    "could have to ",
    "should have to ",
    "must have to ",
    "need to ",
    "try to ",
};
constexpr size_t kNumAuxiliaries = sizeof(Auxiliaries) / sizeof(Auxiliaries[0]);

const char* Terminators[] = {
    ".", ";", ":", "?", "!", "--",
};
constexpr size_t kNumTerminators = sizeof(Terminators) / sizeof(Terminators[0]);

constexpr uint32_t kStartDate =
    8035;  // January 1, 1992 is 8035 days after January 1, 1970
constexpr uint32_t kCurrentDate =
    9298;  // June 17, 1995 is 9298 days after January 1, 1970
constexpr uint32_t kEndDate =
    10591;  // December 12, 1998 is 10591 days after January 1, 1970

std::uniform_int_distribution<int64_t> kSeedDist(std::numeric_limits<int64_t>::min(),
                                                 std::numeric_limits<int64_t>::max());
// The spec says to generate a 300 MB string according to a grammar. This is a
// concurrent implementation of the generator. Each thread generates the text in
// (up to) 8KB chunks of text. The generator maintains a cursor into the
// 300 MB buffer. After generating the chunk, the cursor is incremented
// to reserve space, and the chunk is memcpy-d in.
// This text is used to generate the COMMENT columns. To generate a comment, the spec
// says to pick a random length and a random offset into the 300 MB buffer (it does
// not specify it should be word/sentence aligned), and that slice of text becomes
// the comment.
class TpchPseudotext {
 public:
  Status EnsureInitialized(random::pcg32_fast& rng);
  Result<Datum> GenerateComments(size_t num_comments, size_t min_length,
                                 size_t max_length, random::pcg32_fast& rng);

 private:
  bool GenerateWord(int64_t& offset, random::pcg32_fast& rng, char* arr,
                    const char** words, size_t num_choices);
  bool GenerateNoun(int64_t& offset, random::pcg32_fast& rng, char* arr);
  bool GenerateVerb(int64_t& offset, random::pcg32_fast& rng, char* arr);
  bool GenerateAdjective(int64_t& offset, random::pcg32_fast& rng, char* arr);
  bool GenerateAdverb(int64_t& offset, random::pcg32_fast& rng, char* arr);
  bool GeneratePreposition(int64_t& offset, random::pcg32_fast& rng, char* arr);
  bool GenerateAuxiliary(int64_t& offset, random::pcg32_fast& rng, char* arr);
  bool GenerateTerminator(int64_t& offset, random::pcg32_fast& rng, char* arr);

  bool GenerateNounPhrase(int64_t& offset, random::pcg32_fast& rng, char* arr);
  bool GenerateVerbPhrase(int64_t& offset, random::pcg32_fast& rng, char* arr);
  bool GeneratePrepositionalPhrase(int64_t& offset, random::pcg32_fast& rng, char* arr);

  bool GenerateSentence(int64_t& offset, random::pcg32_fast& rng, char* arr);

  std::atomic<bool> done_ = {false};
  int64_t generated_offset_{0};
  std::mutex text_guard_;
  std::unique_ptr<Buffer> text_;
  static constexpr int64_t kChunkSize = 8192;
  static constexpr int64_t kTextBytes = 300 * 1024 * 1024;  // 300 MB
};

static TpchPseudotext g_text;

Status TpchPseudotext::EnsureInitialized(random::pcg32_fast& rng) {
  if (done_.load()) return Status::OK();

  {
    std::lock_guard<std::mutex> lock(text_guard_);
    if (!text_) {
      ARROW_ASSIGN_OR_RAISE(text_, AllocateBuffer(kTextBytes));
    }
  }
  char* out = reinterpret_cast<char*>(text_->mutable_data());
  char temp_buff[kChunkSize];

  while (!done_.load()) {
    int64_t known_valid_offset = 0;
    int64_t try_offset = 0;
    while (GenerateSentence(try_offset, rng, temp_buff)) known_valid_offset = try_offset;

    bool last_one;
    int64_t offset;
    int64_t memcpy_size;
    {
      std::lock_guard<std::mutex> lock(text_guard_);
      if (done_.load()) return Status::OK();
      int64_t bytes_remaining = kTextBytes - generated_offset_;
      memcpy_size = std::min(known_valid_offset, bytes_remaining);
      offset = generated_offset_;
      generated_offset_ += memcpy_size;
      last_one = generated_offset_ == kTextBytes;
    }
    std::memcpy(out + offset, temp_buff, memcpy_size);
    if (last_one) done_.store(true);
  }
  return Status::OK();
}

Result<Datum> TpchPseudotext::GenerateComments(size_t num_comments, size_t min_length,
                                               size_t max_length,
                                               random::pcg32_fast& rng) {
  RETURN_NOT_OK(EnsureInitialized(rng));
  std::uniform_int_distribution<size_t> length_dist(min_length, max_length);
  ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> offset_buffer,
                        AllocateBuffer(sizeof(int32_t) * (num_comments + 1)));
  int32_t* offsets = reinterpret_cast<int32_t*>(offset_buffer->mutable_data());
  offsets[0] = 0;
  for (size_t i = 1; i <= num_comments; i++)
    offsets[i] = offsets[i - 1] + static_cast<int32_t>(length_dist(rng));

  ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> comment_buffer,
                        AllocateBuffer(offsets[num_comments]));
  char* comments = reinterpret_cast<char*>(comment_buffer->mutable_data());
  for (size_t i = 0; i < num_comments; i++) {
    size_t length = offsets[i + 1] - offsets[i];
    std::uniform_int_distribution<size_t> offset_dist(0, kTextBytes - length);
    size_t offset_in_text = offset_dist(rng);
    std::memcpy(comments + offsets[i], text_->data() + offset_in_text, length);
  }
  ArrayData ad(utf8(), num_comments,
               {nullptr, std::move(offset_buffer), std::move(comment_buffer)});
  return ad;
}

bool TpchPseudotext::GenerateWord(int64_t& offset, random::pcg32_fast& rng, char* arr,
                                  const char** words, size_t num_choices) {
  std::uniform_int_distribution<size_t> dist(0, num_choices - 1);
  const char* word = words[dist(rng)];
  size_t length = std::strlen(word);
  if (offset + length > kChunkSize) return false;
  std::memcpy(arr + offset, word, length);
  offset += length;
  return true;
}

bool TpchPseudotext::GenerateNoun(int64_t& offset, random::pcg32_fast& rng, char* arr) {
  return GenerateWord(offset, rng, arr, Nouns, kNumNouns);
}

bool TpchPseudotext::GenerateVerb(int64_t& offset, random::pcg32_fast& rng, char* arr) {
  return GenerateWord(offset, rng, arr, Verbs, kNumVerbs);
}

bool TpchPseudotext::GenerateAdjective(int64_t& offset, random::pcg32_fast& rng,
                                       char* arr) {
  return GenerateWord(offset, rng, arr, Adjectives, kNumAdjectives);
}

bool TpchPseudotext::GenerateAdverb(int64_t& offset, random::pcg32_fast& rng, char* arr) {
  return GenerateWord(offset, rng, arr, Adverbs, kNumAdverbs);
}

bool TpchPseudotext::GeneratePreposition(int64_t& offset, random::pcg32_fast& rng,
                                         char* arr) {
  return GenerateWord(offset, rng, arr, Prepositions, kNumPrepositions);
}

bool TpchPseudotext::GenerateAuxiliary(int64_t& offset, random::pcg32_fast& rng,
                                       char* arr) {
  return GenerateWord(offset, rng, arr, Auxiliaries, kNumAuxiliaries);
}

bool TpchPseudotext::GenerateTerminator(int64_t& offset, random::pcg32_fast& rng,
                                        char* arr) {
  bool result = GenerateWord(offset, rng, arr, Terminators, kNumTerminators);
  // Swap the space with the terminator
  if (result) std::swap(*(arr + offset - 2), *(arr + offset - 1));
  return result;
}

bool TpchPseudotext::GenerateNounPhrase(int64_t& offset, random::pcg32_fast& rng,
                                        char* arr) {
  std::uniform_int_distribution<size_t> dist(0, 3);
  const char* comma_space = ", ";
  bool success = true;
  switch (dist(rng)) {
    case 0:
      success &= GenerateNoun(offset, rng, arr);
      break;
    case 1:
      success &= GenerateAdjective(offset, rng, arr);
      success &= GenerateNoun(offset, rng, arr);
      break;
    case 2:
      success &= GenerateAdjective(offset, rng, arr);
      success &= GenerateWord(--offset, rng, arr, &comma_space, 1);
      success &= GenerateAdjective(offset, rng, arr);
      success &= GenerateNoun(offset, rng, arr);
      break;
    case 3:
      success &= GenerateAdverb(offset, rng, arr);
      success &= GenerateAdjective(offset, rng, arr);
      success &= GenerateNoun(offset, rng, arr);
      break;
    default:
      Unreachable("Random number should be between 0 and 3 inclusive");
      break;
  }
  return success;
}

bool TpchPseudotext::GenerateVerbPhrase(int64_t& offset, random::pcg32_fast& rng,
                                        char* arr) {
  std::uniform_int_distribution<size_t> dist(0, 3);
  bool success = true;
  switch (dist(rng)) {
    case 0:
      success &= GenerateVerb(offset, rng, arr);
      break;
    case 1:
      success &= GenerateAuxiliary(offset, rng, arr);
      success &= GenerateVerb(offset, rng, arr);
      break;
    case 2:
      success &= GenerateVerb(offset, rng, arr);
      success &= GenerateAdverb(offset, rng, arr);
      break;
    case 3:
      success &= GenerateAuxiliary(offset, rng, arr);
      success &= GenerateVerb(offset, rng, arr);
      success &= GenerateAdverb(offset, rng, arr);
      break;
    default:
      Unreachable("Random number should be between 0 and 3 inclusive");
      break;
  }
  return success;
}

bool TpchPseudotext::GeneratePrepositionalPhrase(int64_t& offset, random::pcg32_fast& rng,
                                                 char* arr) {
  const char* the_space = "the ";
  bool success = true;
  success &= GeneratePreposition(offset, rng, arr);
  success &= GenerateWord(offset, rng, arr, &the_space, 1);
  success &= GenerateNounPhrase(offset, rng, arr);
  return success;
}

bool TpchPseudotext::GenerateSentence(int64_t& offset, random::pcg32_fast& rng,
                                      char* arr) {
  std::uniform_int_distribution<size_t> dist(0, 4);
  bool success = true;
  switch (dist(rng)) {
    case 0:
      success &= GenerateNounPhrase(offset, rng, arr);
      success &= GenerateVerbPhrase(offset, rng, arr);
      success &= GenerateTerminator(offset, rng, arr);
      break;
    case 1:
      success &= GenerateNounPhrase(offset, rng, arr);
      success &= GenerateVerbPhrase(offset, rng, arr);
      success &= GeneratePrepositionalPhrase(offset, rng, arr);
      success &= GenerateTerminator(offset, rng, arr);
      break;
    case 2:
      success &= GenerateNounPhrase(offset, rng, arr);
      success &= GenerateVerbPhrase(offset, rng, arr);
      success &= GenerateNounPhrase(offset, rng, arr);
      success &= GenerateTerminator(offset, rng, arr);
      break;
    case 3:
      success &= GenerateNounPhrase(offset, rng, arr);
      success &= GeneratePrepositionalPhrase(offset, rng, arr);
      success &= GenerateVerbPhrase(offset, rng, arr);
      success &= GenerateNounPhrase(offset, rng, arr);
      success &= GenerateTerminator(offset, rng, arr);
      break;
    case 4:
      success &= GenerateNounPhrase(offset, rng, arr);
      success &= GeneratePrepositionalPhrase(offset, rng, arr);
      success &= GenerateVerbPhrase(offset, rng, arr);
      success &= GeneratePrepositionalPhrase(offset, rng, arr);
      success &= GenerateTerminator(offset, rng, arr);
      break;
    default:
      Unreachable("Random number should be between 0 and 5 inclusive");
      break;
  }
  return success;
}

class TpchTableGenerator {
 public:
  using OutputBatchCallback = std::function<Status(ExecBatch)>;
  using FinishedCallback = std::function<Status(int64_t)>;
  using GenerateFn = std::function<Status(size_t)>;
  using ScheduleCallback = std::function<Status(GenerateFn)>;
  using AbortCallback = std::function<void()>;

  virtual Status Init(std::vector<std::string> columns, double scale_factor,
                      int64_t batch_size, int64_t seed) = 0;

  virtual Status StartProducing(size_t num_threads, OutputBatchCallback output_callback,
                                FinishedCallback finished_callback,
                                ScheduleCallback schedule_callback) = 0;

  bool Abort() {
    bool expected = false;
    return done_.compare_exchange_strong(expected, true);
  }

  virtual std::shared_ptr<Schema> schema() const = 0;

  virtual ~TpchTableGenerator() = default;

 protected:
  int64_t seed_ = {0};
  std::atomic<bool> done_ = {false};
  std::atomic<int64_t> batches_outputted_ = {0};
};

int GetNumDigits(int64_t x) {
  // This if statement chain is for MAXIMUM SPEED
  // Source:
  // https://stackoverflow.com/questions/1068849/how-do-i-determine-the-number-of-digits-of-an-integer-in-c
  ARROW_DCHECK(x >= 0);
  if (x < 10ll) return 1;
  if (x < 100ll) return 2;
  if (x < 1000ll) return 3;
  if (x < 10000ll) return 4;
  if (x < 100000ll) return 5;
  if (x < 1000000ll) return 6;
  if (x < 10000000ll) return 7;
  if (x < 100000000ll) return 8;
  if (x < 1000000000ll) return 9;
  if (x < 10000000000ll) return 10;
  if (x < 100000000000ll) return 11;
  if (x < 1000000000000ll) return 12;
  if (x < 10000000000000ll) return 13;
  if (x < 100000000000000ll) return 14;
  if (x < 1000000000000000ll) return 15;
  if (x < 10000000000000000ll) return 16;
  if (x < 100000000000000000ll) return 17;
  if (x < 1000000000000000000ll) return 18;
  Unreachable("Positive 64-bit integer should never have more than 18 digits");
  return -1;
}

void AppendNumberPaddedToNineDigits(char* out, int64_t x) {
  size_t kPad = 9;
  out += std::max(kPad, static_cast<size_t>(GetNumDigits(x)));
  arrow::internal::detail::FormatAllDigitsLeftPadded(x, kPad, '0', &out);
}

Result<std::shared_ptr<Schema>> SetOutputColumns(
    const std::vector<std::string>& columns,
    const std::vector<std::shared_ptr<DataType>>& types,
    const std::unordered_map<std::string, int>& name_map, std::vector<int>& gen_list) {
  gen_list.clear();
  std::vector<std::shared_ptr<Field>> fields;
  if (columns.empty()) {
    fields.resize(name_map.size());
    gen_list.resize(name_map.size());
    for (auto pair : name_map) {
      int col_idx = pair.second;
      fields[col_idx] = field(pair.first, types[col_idx]);
      gen_list[col_idx] = col_idx;
    }
    return schema(std::move(fields));
  } else {
    for (const std::string& col : columns) {
      auto entry = name_map.find(col);
      if (entry == name_map.end()) return Status::Invalid("Not a valid column name");
      int col_idx = static_cast<int>(entry->second);
      fields.push_back(field(col, types[col_idx]));
      gen_list.push_back(col_idx);
    }
    return schema(std::move(fields));
  }
}

Result<Datum> RandomVString(random::pcg32_fast& rng, int64_t num_rows, int32_t min_length,
                            int32_t max_length) {
  std::uniform_int_distribution<int32_t> length_dist(min_length, max_length);
  ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> offset_buff,
                        AllocateBuffer((num_rows + 1) * sizeof(int32_t)));
  int32_t* offsets = reinterpret_cast<int32_t*>(offset_buff->mutable_data());
  offsets[0] = 0;
  for (int64_t i = 1; i <= num_rows; i++) offsets[i] = offsets[i - 1] + length_dist(rng);
  ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> str_buff,
                        AllocateBuffer(offsets[num_rows]));
  char* str = reinterpret_cast<char*>(str_buff->mutable_data());

  // Spec says to pick random alphanumeric characters from a set of at least
  // 64 symbols. Now, let's think critically here: 26 letters in the alphabet,
  // so 52 total for upper and lower case, and 10 possible digits gives 62
  // characters...
  // dbgen solves this by including a space and a comma as well, so we'll
  // copy that.
  const char alpha_numerics[65] =
      "0123456789abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ,";
  std::uniform_int_distribution<int> char_dist(0, 63);
  for (int32_t i = 0; i < offsets[num_rows]; i++) str[i] = alpha_numerics[char_dist(rng)];

  ArrayData ad(utf8(), num_rows, {nullptr, std::move(offset_buff), std::move(str_buff)});
  return ad;
}

void GeneratePhoneNumber(char* out, random::pcg32_fast& rng, int32_t country) {
  std::uniform_int_distribution<int32_t> three_digit(100, 999);
  std::uniform_int_distribution<int32_t> four_digit(1000, 9999);

  int32_t country_code = country + 10;
  int32_t l1 = three_digit(rng);
  int32_t l2 = three_digit(rng);
  int32_t l3 = four_digit(rng);

  out += 15;
  arrow::internal::detail::FormatAllDigits(l3, &out);
  *(--out) = '-';
  arrow::internal::detail::FormatAllDigits(l2, &out);
  *(--out) = '-';
  arrow::internal::detail::FormatAllDigits(l1, &out);
  *(--out) = '-';
  arrow::internal::detail::FormatTwoDigits(country_code, &out);
}

using GenerateColumnFn = std::function<Status(size_t)>;
class PartAndPartSupplierGenerator {
 public:
  Status Init(size_t num_threads, int64_t batch_size, double scale_factor, int64_t seed) {
    if (!inited_) {
      inited_ = true;
      batch_size_ = batch_size;
      scale_factor_ = scale_factor;

      thread_local_data_.resize(num_threads);
      random::pcg64_fast seed_rng(seed);
      for (ThreadLocalData& tld : thread_local_data_) {
        constexpr int kMaxNumDistinctStrings = 5;
        tld.string_indices.resize(kMaxNumDistinctStrings * batch_size_);
        tld.rng.seed(kSeedDist(seed_rng));
      }
      part_rows_to_generate_ = static_cast<int64_t>(scale_factor_ * 200000);
    }
    return Status::OK();
  }

  int64_t part_batches_generated() const { return part_batches_generated_.load(); }

  int64_t partsupp_batches_generated() const {
    return partsupp_batches_generated_.load();
  }

  Result<std::shared_ptr<Schema>> SetPartOutputColumns(
      const std::vector<std::string>& cols) {
    return SetOutputColumns(cols, kPartTypes, kPartNameMap, part_cols_);
  }

  Result<std::shared_ptr<Schema>> SetPartSuppOutputColumns(
      const std::vector<std::string>& cols) {
    return SetOutputColumns(cols, kPartsuppTypes, kPartsuppNameMap, partsupp_cols_);
  }

  Result<std::optional<ExecBatch>> NextPartBatch(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    {
      std::lock_guard<std::mutex> lock(part_output_queue_mutex_);
      if (!part_output_queue_.empty()) {
        ExecBatch batch = std::move(part_output_queue_.front());
        part_output_queue_.pop();
        return batch;
      } else if (part_rows_generated_ == part_rows_to_generate_) {
        return std::nullopt;
      } else {
        tld.partkey_start = part_rows_generated_;
        tld.part_to_generate =
            std::min(batch_size_, part_rows_to_generate_ - part_rows_generated_);
        part_rows_generated_ += tld.part_to_generate;

        int64_t num_ps_batches = PartsuppBatchesToGenerate(thread_index);
        part_batches_generated_.fetch_add(1);
        partsupp_batches_generated_.fetch_add(num_ps_batches);
        ARROW_DCHECK(part_rows_generated_ <= part_rows_to_generate_);
      }
    }
    tld.part.resize(PART::kNumCols);
    std::fill(tld.part.begin(), tld.part.end(), Datum());
    RETURN_NOT_OK(InitPartsupp(thread_index));

    for (int col : part_cols_) RETURN_NOT_OK(kPartGenerators[col](thread_index));
    for (int col : partsupp_cols_) RETURN_NOT_OK(kPartsuppGenerators[col](thread_index));

    std::vector<Datum> part_result(part_cols_.size());
    for (size_t i = 0; i < part_cols_.size(); i++) {
      int col_idx = part_cols_[i];
      part_result[i] = tld.part[col_idx];
    }
    if (!partsupp_cols_.empty()) {
      std::vector<ExecBatch> partsupp_results;
      for (size_t ibatch = 0; ibatch < tld.partsupp.size(); ibatch++) {
        std::vector<Datum> partsupp_result(partsupp_cols_.size());
        for (size_t icol = 0; icol < partsupp_cols_.size(); icol++) {
          int col_idx = partsupp_cols_[icol];
          partsupp_result[icol] = tld.partsupp[ibatch][col_idx];
        }
        ARROW_ASSIGN_OR_RAISE(ExecBatch eb, ExecBatch::Make(std::move(partsupp_result)));
        partsupp_results.emplace_back(std::move(eb));
      }
      {
        std::lock_guard<std::mutex> guard(partsupp_output_queue_mutex_);
        for (ExecBatch& eb : partsupp_results) {
          partsupp_output_queue_.emplace(std::move(eb));
        }
      }
    }
    return ExecBatch::Make(std::move(part_result));
  }

  Result<std::optional<ExecBatch>> NextPartSuppBatch(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    {
      std::lock_guard<std::mutex> lock(partsupp_output_queue_mutex_);
      if (!partsupp_output_queue_.empty()) {
        ExecBatch result = std::move(partsupp_output_queue_.front());
        partsupp_output_queue_.pop();
        return result;
      }
    }
    {
      std::lock_guard<std::mutex> lock(part_output_queue_mutex_);
      if (part_rows_generated_ == part_rows_to_generate_) {
        return std::nullopt;
      } else {
        tld.partkey_start = part_rows_generated_;
        tld.part_to_generate =
            std::min(batch_size_, part_rows_to_generate_ - part_rows_generated_);
        part_rows_generated_ += tld.part_to_generate;
        int64_t num_ps_batches = PartsuppBatchesToGenerate(thread_index);
        part_batches_generated_.fetch_add(1);
        partsupp_batches_generated_.fetch_add(num_ps_batches);
        ARROW_DCHECK(part_rows_generated_ <= part_rows_to_generate_);
      }
    }
    tld.part.resize(PART::kNumCols);
    std::fill(tld.part.begin(), tld.part.end(), Datum());
    RETURN_NOT_OK(InitPartsupp(thread_index));

    for (int col : part_cols_) RETURN_NOT_OK(kPartGenerators[col](thread_index));
    for (int col : partsupp_cols_) RETURN_NOT_OK(kPartsuppGenerators[col](thread_index));
    if (!part_cols_.empty()) {
      std::vector<Datum> part_result(part_cols_.size());
      for (size_t i = 0; i < part_cols_.size(); i++) {
        int col_idx = part_cols_[i];
        part_result[i] = tld.part[col_idx];
      }
      ARROW_ASSIGN_OR_RAISE(ExecBatch part_batch,
                            ExecBatch::Make(std::move(part_result)));
      {
        std::lock_guard<std::mutex> lock(part_output_queue_mutex_);
        part_output_queue_.emplace(std::move(part_batch));
      }
    }
    std::vector<ExecBatch> partsupp_results;
    for (size_t ibatch = 0; ibatch < tld.partsupp.size(); ibatch++) {
      std::vector<Datum> partsupp_result(partsupp_cols_.size());
      for (size_t icol = 0; icol < partsupp_cols_.size(); icol++) {
        int col_idx = partsupp_cols_[icol];
        partsupp_result[icol] = tld.partsupp[ibatch][col_idx];
      }
      ARROW_ASSIGN_OR_RAISE(ExecBatch eb, ExecBatch::Make(std::move(partsupp_result)));
      partsupp_results.emplace_back(std::move(eb));
    }
    // Return the first batch, enqueue the rest.
    {
      std::lock_guard<std::mutex> lock(partsupp_output_queue_mutex_);
      for (size_t i = 1; i < partsupp_results.size(); i++)
        partsupp_output_queue_.emplace(std::move(partsupp_results[i]));
    }
    return std::move(partsupp_results[0]);
  }

 private:
#define FOR_EACH_PART_COLUMN(F) \
  F(P_PARTKEY)                  \
  F(P_NAME)                     \
  F(P_MFGR)                     \
  F(P_BRAND)                    \
  F(P_TYPE)                     \
  F(P_SIZE)                     \
  F(P_CONTAINER)                \
  F(P_RETAILPRICE)              \
  F(P_COMMENT)

#define FOR_EACH_PARTSUPP_COLUMN(F) \
  F(PS_PARTKEY)                     \
  F(PS_SUPPKEY)                     \
  F(PS_AVAILQTY)                    \
  F(PS_SUPPLYCOST)                  \
  F(PS_COMMENT)

#define MAKE_ENUM(col) col,
  struct PART {
    enum {
      FOR_EACH_PART_COLUMN(MAKE_ENUM) kNumCols,
    };
  };
  struct PARTSUPP {
    enum {
      FOR_EACH_PARTSUPP_COLUMN(MAKE_ENUM) kNumCols,
    };
  };

#define MAKE_STRING_MAP(col) {#col, PART::col},
  const std::unordered_map<std::string, int> kPartNameMap = {
      FOR_EACH_PART_COLUMN(MAKE_STRING_MAP)};
#undef MAKE_STRING_MAP
#define MAKE_STRING_MAP(col) {#col, PARTSUPP::col},
  const std::unordered_map<std::string, int> kPartsuppNameMap = {
      FOR_EACH_PARTSUPP_COLUMN(MAKE_STRING_MAP)};
#undef MAKE_STRING_MAP
#define MAKE_FN_ARRAY(col) \
  [this](size_t thread_index) { return this->col(thread_index); },
  std::vector<GenerateColumnFn> kPartGenerators = {FOR_EACH_PART_COLUMN(MAKE_FN_ARRAY)};
  std::vector<GenerateColumnFn> kPartsuppGenerators = {
      FOR_EACH_PARTSUPP_COLUMN(MAKE_FN_ARRAY)};
#undef MAKE_FN_ARRAY
#undef FOR_EACH_LINEITEM_COLUMN
#undef FOR_EACH_ORDERS_COLUMN

  const std::vector<std::shared_ptr<DataType>> kPartTypes = {
      int32(), utf8(),  fixed_size_binary(25), fixed_size_binary(10),
      utf8(),  int32(), fixed_size_binary(10), decimal128(12, 2),
      utf8(),
  };

  const std::vector<std::shared_ptr<DataType>> kPartsuppTypes = {
      int32(), int32(), int32(), decimal128(12, 2), utf8(),
  };

  Status AllocatePartBatch(size_t thread_index, int column) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    ARROW_DCHECK(tld.part[column].kind() == Datum::NONE);
    int32_t byte_width = kPartTypes[column]->byte_width();
    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
                          AllocateBuffer(tld.part_to_generate * byte_width));
    ArrayData ad(kPartTypes[column], tld.part_to_generate, {nullptr, std::move(buff)});
    tld.part[column] = std::move(ad);
    return Status::OK();
  }

  Status P_PARTKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.part[PART::P_PARTKEY].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocatePartBatch(thread_index, PART::P_PARTKEY));
      int32_t* p_partkey = reinterpret_cast<int32_t*>(
          tld.part[PART::P_PARTKEY].array()->buffers[1]->mutable_data());
      for (int64_t i = 0; i < tld.part_to_generate; i++) {
        p_partkey[i] = static_cast<int32_t>(tld.partkey_start + i + 1);
        ARROW_DCHECK(1 <= p_partkey[i] && p_partkey[i] <= part_rows_to_generate_);
      }
    }
    return Status::OK();
  }

  Status P_NAME(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.part[PART::P_NAME].kind() == Datum::NONE) {
      std::uniform_int_distribution<int32_t> dist(
          0, static_cast<uint8_t>(kNumNameParts - 1));
      ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> offset_buff,
                            AllocateBuffer((tld.part_to_generate + 1) * sizeof(int32_t)));
      int32_t* offsets = reinterpret_cast<int32_t*>(offset_buff->mutable_data());
      offsets[0] = 0;
      for (int64_t irow = 0; irow < tld.part_to_generate; irow++) {
        size_t string_length = 0;
        for (int ipart = 0; ipart < 5; ipart++) {
          uint8_t name_part_index = static_cast<uint8_t>(dist(tld.rng));
          tld.string_indices[irow * 5 + ipart] = name_part_index;
          string_length += std::strlen(NameParts[name_part_index]);
        }
        // Add 4 because there is a space between each word (i.e. four spaces)
        offsets[irow + 1] = static_cast<int32_t>(offsets[irow] + string_length + 4);
      }
      // Add an extra byte for the space after in the very last string.
      ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> string_buffer,
                            AllocateBuffer(offsets[tld.part_to_generate] + 1));
      char* strings = reinterpret_cast<char*>(string_buffer->mutable_data());
      for (int64_t irow = 0; irow < tld.part_to_generate; irow++) {
        char* row = strings + offsets[irow];
        for (int ipart = 0; ipart < 5; ipart++) {
          uint8_t name_part_index = tld.string_indices[irow * 5 + ipart];
          const char* part = NameParts[name_part_index];
          size_t length = std::strlen(part);
          std::memcpy(row, part, length);
          row += length;
          *row++ = ' ';
        }
      }
      ArrayData ad(kPartTypes[PART::P_NAME], tld.part_to_generate,
                   {nullptr, std::move(offset_buff), std::move(string_buffer)});
      Datum datum(ad);
      tld.part[PART::P_NAME] = std::move(datum);
    }
    return Status::OK();
  }

  Status P_MFGR(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.part[PART::P_MFGR].kind() == Datum::NONE) {
      std::uniform_int_distribution<int> dist(1, 5);
      const char* manufacturer = "Manufacturer#";
      const size_t manufacturer_length = std::strlen(manufacturer);
      RETURN_NOT_OK(AllocatePartBatch(thread_index, PART::P_MFGR));
      char* p_mfgr = reinterpret_cast<char*>(
          tld.part[PART::P_MFGR].array()->buffers[1]->mutable_data());
      int32_t byte_width = kPartTypes[PART::P_MFGR]->byte_width();
      for (int64_t irow = 0; irow < tld.part_to_generate; irow++) {
        std::strncpy(p_mfgr + byte_width * irow, manufacturer, byte_width);
        char mfgr_id = '0' + dist(tld.rng);
        *(p_mfgr + byte_width * irow + manufacturer_length) = mfgr_id;
      }
    }
    return Status::OK();
  }

  Status P_BRAND(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.part[PART::P_BRAND].kind() == Datum::NONE) {
      RETURN_NOT_OK(P_MFGR(thread_index));
      std::uniform_int_distribution<int> dist(1, 5);
      const char* brand = "Brand#";
      const size_t brand_length = std::strlen(brand);
      RETURN_NOT_OK(AllocatePartBatch(thread_index, PART::P_BRAND));
      const char* p_mfgr = reinterpret_cast<const char*>(
          tld.part[PART::P_MFGR].array()->buffers[1]->data());
      char* p_brand = reinterpret_cast<char*>(
          tld.part[PART::P_BRAND].array()->buffers[1]->mutable_data());
      int32_t byte_width = kPartTypes[PART::P_BRAND]->byte_width();
      int32_t mfgr_byte_width = kPartTypes[PART::P_MFGR]->byte_width();
      const size_t mfgr_id_offset = std::strlen("Manufacturer#");
      for (int64_t irow = 0; irow < tld.part_to_generate; irow++) {
        char* row = p_brand + byte_width * irow;
        char mfgr_id = *(p_mfgr + irow * mfgr_byte_width + mfgr_id_offset);
        char brand_id = '0' + dist(tld.rng);
        std::strncpy(row, brand, byte_width);
        *(row + brand_length) = mfgr_id;
        *(row + brand_length + 1) = brand_id;
        irow += 0;
      }
    }
    return Status::OK();
  }

  Status P_TYPE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.part[PART::P_TYPE].kind() == Datum::NONE) {
      using D = std::uniform_int_distribution<uint32_t>;
      D dists[] = {
          D{0, static_cast<uint8_t>(kNumTypes_1 - 1)},
          D{0, static_cast<uint8_t>(kNumTypes_2 - 1)},
          D{0, static_cast<uint8_t>(kNumTypes_3 - 1)},
      };

      const char** types[] = {Types_1, Types_2, Types_3};

      ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> offset_buff,
                            AllocateBuffer((tld.part_to_generate + 1) * sizeof(int32_t)));
      int32_t* offsets = reinterpret_cast<int32_t*>(offset_buff->mutable_data());
      offsets[0] = 0;
      for (int64_t irow = 0; irow < tld.part_to_generate; irow++) {
        size_t string_length = 0;
        for (int ipart = 0; ipart < 3; ipart++) {
          uint8_t name_part_index = static_cast<uint8_t>(dists[ipart](tld.rng));
          tld.string_indices[irow * 3 + ipart] = name_part_index;
          string_length += std::strlen(types[ipart][name_part_index]);
        }
        offsets[irow + 1] = static_cast<int32_t>(offsets[irow] + string_length);
      }
      ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> string_buffer,
                            AllocateBuffer(offsets[tld.part_to_generate]));
      char* strings = reinterpret_cast<char*>(string_buffer->mutable_data());
      for (int64_t irow = 0; irow < tld.part_to_generate; irow++) {
        char* row = strings + offsets[irow];
        for (int ipart = 0; ipart < 3; ipart++) {
          uint8_t name_part_index = tld.string_indices[irow * 3 + ipart];
          const char* part = types[ipart][name_part_index];
          size_t length = std::strlen(part);
          std::memcpy(row, part, length);
          row += length;
        }
      }
      ArrayData ad(kPartTypes[PART::P_TYPE], tld.part_to_generate,
                   {nullptr, std::move(offset_buff), std::move(string_buffer)});
      Datum datum(ad);
      tld.part[PART::P_TYPE] = std::move(datum);
    }
    return Status::OK();
  }

  Status P_SIZE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.part[PART::P_SIZE].kind() == Datum::NONE) {
      std::uniform_int_distribution<int32_t> dist(1, 50);
      RETURN_NOT_OK(AllocatePartBatch(thread_index, PART::P_SIZE));
      int32_t* p_size = reinterpret_cast<int32_t*>(
          tld.part[PART::P_SIZE].array()->buffers[1]->mutable_data());
      for (int64_t i = 0; i < tld.part_to_generate; i++) p_size[i] = dist(tld.rng);
    }
    return Status::OK();
  }

  Status P_CONTAINER(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.part[PART::P_CONTAINER].kind() == Datum::NONE) {
      std::uniform_int_distribution<int> dist1(
          0, static_cast<uint8_t>(kNumContainers_1 - 1));
      std::uniform_int_distribution<int> dist2(
          0, static_cast<uint8_t>(kNumContainers_2 - 1));
      RETURN_NOT_OK(AllocatePartBatch(thread_index, PART::P_CONTAINER));
      char* p_container = reinterpret_cast<char*>(
          tld.part[PART::P_CONTAINER].array()->buffers[1]->mutable_data());
      int32_t byte_width = kPartTypes[PART::P_CONTAINER]->byte_width();
      for (int64_t irow = 0; irow < tld.part_to_generate; irow++) {
        int container1_idx = dist1(tld.rng);
        int container2_idx = dist2(tld.rng);
        const char* container1 = Containers_1[container1_idx];
        const char* container2 = Containers_2[container2_idx];
        size_t container1_length = std::strlen(container1);
        size_t container2_length = std::strlen(container2);

        char* row = p_container + byte_width * irow;
        std::strncpy(row, container1, byte_width);
        std::memcpy(row + container1_length, container2, container2_length);
      }
    }
    return Status::OK();
  }

  Status P_RETAILPRICE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.part[PART::P_RETAILPRICE].kind() == Datum::NONE) {
      RETURN_NOT_OK(P_PARTKEY(thread_index));
      RETURN_NOT_OK(AllocatePartBatch(thread_index, PART::P_RETAILPRICE));
      const int32_t* p_partkey = reinterpret_cast<const int32_t*>(
          tld.part[PART::P_PARTKEY].array()->buffers[1]->data());
      Decimal128* p_retailprice = reinterpret_cast<Decimal128*>(
          tld.part[PART::P_RETAILPRICE].array()->buffers[1]->mutable_data());
      for (int64_t irow = 0; irow < tld.part_to_generate; irow++) {
        int32_t partkey = p_partkey[irow];
        int64_t retail_price =
            (90000 + ((partkey / 10) % 20001) + 100 * (partkey % 1000));
        p_retailprice[irow] = {retail_price};
      }
    }
    return Status::OK();
  }

  Status P_COMMENT(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.part[PART::P_COMMENT].kind() == Datum::NONE) {
      ARROW_ASSIGN_OR_RAISE(
          tld.part[PART::P_COMMENT],
          g_text.GenerateComments(tld.part_to_generate, 5, 22, tld.rng));
    }
    return Status::OK();
  }

  int64_t PartsuppBatchesToGenerate(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    int64_t ps_to_generate = kPartSuppRowsPerPart * tld.part_to_generate;
    int64_t num_batches = (ps_to_generate + batch_size_ - 1) / batch_size_;
    return num_batches;
  }

  Status InitPartsupp(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    tld.generated_partsupp.reset();
    int64_t num_batches = PartsuppBatchesToGenerate(thread_index);
    tld.partsupp.resize(num_batches);
    for (std::vector<Datum>& batch : tld.partsupp) {
      batch.resize(PARTSUPP::kNumCols);
      std::fill(batch.begin(), batch.end(), Datum());
    }
    return Status::OK();
  }

  Status AllocatePartSuppBatch(size_t thread_index, size_t ibatch, int column) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    int32_t byte_width = kPartsuppTypes[column]->byte_width();
    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
                          AllocateResizableBuffer(batch_size_ * byte_width));
    ArrayData ad(kPartsuppTypes[column], batch_size_, {nullptr, std::move(buff)});
    tld.partsupp[ibatch][column] = std::move(ad);
    return Status::OK();
  }

  Status SetPartSuppColumnSize(size_t thread_index, size_t ibatch, int column,
                               size_t new_size) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    int32_t byte_width = kPartsuppTypes[column]->byte_width();
    tld.partsupp[ibatch][column].array()->length = static_cast<int64_t>(new_size);
    ResizableBuffer* buff = checked_cast<ResizableBuffer*>(
        tld.partsupp[ibatch][column].array()->buffers[1].get());
    return buff->Resize(static_cast<int64_t>(new_size * byte_width),
                        /*shrink_to_fit=*/false);
  }

  Status PS_PARTKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_partsupp[PARTSUPP::PS_PARTKEY]) {
      tld.generated_partsupp[PARTSUPP::PS_PARTKEY] = true;
      RETURN_NOT_OK(P_PARTKEY(thread_index));
      const int32_t* p_partkey = reinterpret_cast<const int32_t*>(
          tld.part[PART::P_PARTKEY].array()->buffers[1]->data());

      size_t ibatch = 0;
      int64_t ipartsupp = 0;
      int64_t ipart = 0;
      int64_t ps_to_generate = kPartSuppRowsPerPart * tld.part_to_generate;
      for (int64_t irow = 0; irow < ps_to_generate; ibatch++) {
        RETURN_NOT_OK(AllocatePartSuppBatch(thread_index, ibatch, PARTSUPP::PS_PARTKEY));
        int32_t* ps_partkey =
            reinterpret_cast<int32_t*>(tld.partsupp[ibatch][PARTSUPP::PS_PARTKEY]
                                           .array()
                                           ->buffers[1]
                                           ->mutable_data());
        int64_t next_run = std::min(batch_size_, ps_to_generate - irow);

        int64_t batch_offset = 0;
        for (int64_t irun = 0; irun < next_run;) {
          for (; ipartsupp < kPartSuppRowsPerPart && irun < next_run; ipartsupp++, irun++)
            ps_partkey[batch_offset++] = p_partkey[ipart];

          if (ipartsupp == kPartSuppRowsPerPart) {
            ipartsupp = 0;
            ipart++;
          }
        }
        RETURN_NOT_OK(
            SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_PARTKEY, next_run));
        irow += next_run;
      }
    }
    return Status::OK();
  }

  Status PS_SUPPKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_partsupp[PARTSUPP::PS_SUPPKEY]) {
      tld.generated_partsupp[PARTSUPP::PS_SUPPKEY] = true;
      RETURN_NOT_OK(P_PARTKEY(thread_index));
      const int32_t* p_partkey = reinterpret_cast<const int32_t*>(
          tld.part[PART::P_PARTKEY].array()->buffers[1]->data());

      size_t ibatch = 0;
      int64_t ipartsupp = 0;
      int64_t ipart = 0;
      int64_t ps_to_generate = kPartSuppRowsPerPart * tld.part_to_generate;
      const int32_t S = static_cast<int32_t>(scale_factor_ * 10000);
      for (int64_t irow = 0; irow < ps_to_generate; ibatch++) {
        RETURN_NOT_OK(AllocatePartSuppBatch(thread_index, ibatch, PARTSUPP::PS_SUPPKEY));
        int32_t* ps_suppkey =
            reinterpret_cast<int32_t*>(tld.partsupp[ibatch][PARTSUPP::PS_SUPPKEY]
                                           .array()
                                           ->buffers[1]
                                           ->mutable_data());
        int64_t next_run = std::min(batch_size_, ps_to_generate - irow);

        int64_t batch_offset = 0;
        for (int64_t irun = 0; irun < next_run;) {
          for (; ipartsupp < kPartSuppRowsPerPart && irun < next_run;
               ipartsupp++, irun++) {
            int32_t supplier = static_cast<int32_t>(ipartsupp);
            int32_t partkey = p_partkey[ipart];
            // Magic formula from TPCH spec.
            ps_suppkey[batch_offset++] =
                (partkey + (supplier * ((S / 4) + (partkey - 1) / S))) % S + 1;
          }
          if (ipartsupp == kPartSuppRowsPerPart) {
            ipartsupp = 0;
            ipart++;
          }
        }
        RETURN_NOT_OK(
            SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_SUPPKEY, next_run));
        irow += next_run;
      }
    }
    return Status::OK();
  }

  Status PS_AVAILQTY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_partsupp[PARTSUPP::PS_AVAILQTY]) {
      tld.generated_partsupp[PARTSUPP::PS_AVAILQTY] = true;
      std::uniform_int_distribution<int32_t> dist(1, 9999);
      int64_t ps_to_generate = kPartSuppRowsPerPart * tld.part_to_generate;
      int64_t ibatch = 0;
      for (int64_t irow = 0; irow < ps_to_generate; ibatch++) {
        RETURN_NOT_OK(AllocatePartSuppBatch(thread_index, ibatch, PARTSUPP::PS_AVAILQTY));
        int32_t* ps_availqty =
            reinterpret_cast<int32_t*>(tld.partsupp[ibatch][PARTSUPP::PS_AVAILQTY]
                                           .array()
                                           ->buffers[1]
                                           ->mutable_data());
        int64_t next_run = std::min(batch_size_, ps_to_generate - irow);
        for (int64_t irun = 0; irun < next_run; irun++) ps_availqty[irun] = dist(tld.rng);

        RETURN_NOT_OK(
            SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_AVAILQTY, next_run));
        irow += next_run;
      }
    }
    return Status::OK();
  }

  Status PS_SUPPLYCOST(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_partsupp[PARTSUPP::PS_SUPPLYCOST]) {
      tld.generated_partsupp[PARTSUPP::PS_SUPPLYCOST] = true;
      std::uniform_int_distribution<int64_t> dist(100, 100000);
      int64_t ps_to_generate = kPartSuppRowsPerPart * tld.part_to_generate;
      int64_t ibatch = 0;
      for (int64_t irow = 0; irow < ps_to_generate; ibatch++) {
        RETURN_NOT_OK(
            AllocatePartSuppBatch(thread_index, ibatch, PARTSUPP::PS_SUPPLYCOST));
        Decimal128* ps_supplycost =
            reinterpret_cast<Decimal128*>(tld.partsupp[ibatch][PARTSUPP::PS_SUPPLYCOST]
                                              .array()
                                              ->buffers[1]
                                              ->mutable_data());
        int64_t next_run = std::min(batch_size_, ps_to_generate - irow);
        for (int64_t irun = 0; irun < next_run; irun++)
          ps_supplycost[irun] = {dist(tld.rng)};

        RETURN_NOT_OK(SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_SUPPLYCOST,
                                            next_run));
        irow += next_run;
      }
    }
    return Status::OK();
  }

  Status PS_COMMENT(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_partsupp[PARTSUPP::PS_COMMENT]) {
      tld.generated_partsupp[PARTSUPP::PS_COMMENT] = true;
      int64_t irow = 0;
      int64_t ps_to_generate = kPartSuppRowsPerPart * tld.part_to_generate;
      for (size_t ibatch = 0; ibatch < tld.partsupp.size(); ibatch++) {
        int64_t num_rows = std::min(batch_size_, ps_to_generate - irow);
        ARROW_ASSIGN_OR_RAISE(tld.partsupp[ibatch][PARTSUPP::PS_COMMENT],
                              g_text.GenerateComments(num_rows, 49, 198, tld.rng));
        irow += num_rows;
      }
    }
    return Status::OK();
  }

  struct ThreadLocalData {
    std::vector<Datum> part;
    std::vector<int8_t> string_indices;
    int64_t part_to_generate{0};
    int64_t partkey_start{0};

    std::vector<std::vector<Datum>> partsupp;
    std::bitset<PARTSUPP::kNumCols> generated_partsupp;
    random::pcg32_fast rng;
  };
  std::vector<ThreadLocalData> thread_local_data_;

  bool inited_ = false;
  std::mutex part_output_queue_mutex_;
  std::mutex partsupp_output_queue_mutex_;
  std::queue<ExecBatch> part_output_queue_;
  std::queue<ExecBatch> partsupp_output_queue_;
  int64_t batch_size_{0};
  double scale_factor_{0};
  int64_t part_rows_to_generate_{0};
  int64_t part_rows_generated_{0};
  std::vector<int> part_cols_;
  std::vector<int> partsupp_cols_;

  std::atomic<int64_t> part_batches_generated_ = {0};
  std::atomic<int64_t> partsupp_batches_generated_ = {0};
  static constexpr int64_t kPartSuppRowsPerPart = 4;
};

class OrdersAndLineItemGenerator {
 public:
  Status Init(size_t num_threads, int64_t batch_size, double scale_factor, int64_t seed) {
    if (!inited_) {
      inited_ = true;
      batch_size_ = batch_size;
      scale_factor_ = scale_factor;

      thread_local_data_.resize(num_threads);
      random::pcg64_fast seed_rng(seed);
      for (ThreadLocalData& tld : thread_local_data_) {
        tld.items_per_order.resize(batch_size_);
        tld.rng.seed(kSeedDist(seed_rng));
      }
      orders_rows_to_generate_ = static_cast<int64_t>(scale_factor_ * 150000 * 10);
    }
    return Status::OK();
  }

  int64_t orders_batches_generated() const { return orders_batches_generated_.load(); }

  int64_t lineitem_batches_generated() const {
    return lineitem_batches_generated_.load();
  }

  Result<std::shared_ptr<Schema>> SetOrdersOutputColumns(
      const std::vector<std::string>& cols) {
    return SetOutputColumns(cols, kOrdersTypes, kOrdersNameMap, orders_cols_);
  }

  Result<std::shared_ptr<Schema>> SetLineItemOutputColumns(
      const std::vector<std::string>& cols) {
    return SetOutputColumns(cols, kLineitemTypes, kLineitemNameMap, lineitem_cols_);
  }

  Result<std::optional<ExecBatch>> NextOrdersBatch(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    {
      std::lock_guard<std::mutex> lock(orders_output_queue_mutex_);
      if (!orders_output_queue_.empty()) {
        ExecBatch batch = std::move(orders_output_queue_.front());
        orders_output_queue_.pop();
        return batch;
      } else if (orders_rows_generated_ == orders_rows_to_generate_) {
        return std::nullopt;
      } else {
        tld.orderkey_start = orders_rows_generated_;
        tld.orders_to_generate =
            std::min(batch_size_, orders_rows_to_generate_ - orders_rows_generated_);
        orders_rows_generated_ += tld.orders_to_generate;
        orders_batches_generated_.fetch_add(1);
        tld.first_batch_offset = 0;
        RETURN_NOT_OK(GenerateRowCounts(thread_index));
        lineitem_batches_generated_.fetch_add(static_cast<int64_t>(tld.lineitem.size()));
        ARROW_DCHECK(orders_rows_generated_ <= orders_rows_to_generate_);
      }
    }
    tld.orders.resize(ORDERS::kNumCols);
    std::fill(tld.orders.begin(), tld.orders.end(), Datum());
    tld.generated_lineitem.reset();

    for (int col : orders_cols_) RETURN_NOT_OK(kOrdersGenerators[col](thread_index));
    for (int col : lineitem_cols_) RETURN_NOT_OK(kLineitemGenerators[col](thread_index));

    std::vector<Datum> orders_result(orders_cols_.size());
    for (size_t i = 0; i < orders_cols_.size(); i++) {
      int col_idx = orders_cols_[i];
      orders_result[i] = tld.orders[col_idx];
    }
    if (!lineitem_cols_.empty()) {
      std::vector<ExecBatch> lineitem_results;
      for (size_t ibatch = 0; ibatch < tld.lineitem.size(); ibatch++) {
        std::vector<Datum> lineitem_result(lineitem_cols_.size());
        for (size_t icol = 0; icol < lineitem_cols_.size(); icol++) {
          int col_idx = lineitem_cols_[icol];
          lineitem_result[icol] = tld.lineitem[ibatch][col_idx];
        }
        ARROW_ASSIGN_OR_RAISE(ExecBatch eb, ExecBatch::Make(std::move(lineitem_result)));
        lineitem_results.emplace_back(std::move(eb));
      }
      {
        std::lock_guard<std::mutex> guard(lineitem_output_queue_mutex_);
        for (ExecBatch& eb : lineitem_results) {
          lineitem_output_queue_.emplace(std::move(eb));
        }
      }
    }
    return ExecBatch::Make(std::move(orders_result));
  }

  Result<std::optional<ExecBatch>> NextLineItemBatch(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    ExecBatch queued;
    bool from_queue = false;
    {
      std::lock_guard<std::mutex> lock(lineitem_output_queue_mutex_);
      if (!lineitem_output_queue_.empty()) {
        queued = std::move(lineitem_output_queue_.front());
        lineitem_output_queue_.pop();
        from_queue = true;
      }
    }
    tld.first_batch_offset = 0;
    if (from_queue) {
      ARROW_DCHECK(queued.length <= batch_size_);
      tld.first_batch_offset = queued.length;
      if (queued.length == batch_size_) return queued;
    }
    {
      std::lock_guard<std::mutex> lock(orders_output_queue_mutex_);
      if (orders_rows_generated_ == orders_rows_to_generate_) {
        if (from_queue) return queued;
        return std::nullopt;
      }

      tld.orderkey_start = orders_rows_generated_;
      tld.orders_to_generate =
          std::min(batch_size_, orders_rows_to_generate_ - orders_rows_generated_);
      orders_rows_generated_ += tld.orders_to_generate;
      orders_batches_generated_.fetch_add(1);
      RETURN_NOT_OK(GenerateRowCounts(thread_index));
      lineitem_batches_generated_.fetch_add(
          static_cast<int64_t>(tld.lineitem.size() - from_queue));
      ARROW_DCHECK(orders_rows_generated_ <= orders_rows_to_generate_);
    }
    tld.orders.resize(ORDERS::kNumCols);
    std::fill(tld.orders.begin(), tld.orders.end(), Datum());
    tld.generated_lineitem.reset();
    if (from_queue) {
      for (size_t i = 0; i < lineitem_cols_.size(); i++)
        if (tld.lineitem[0][lineitem_cols_[i]].kind() == Datum::NONE)
          tld.lineitem[0][lineitem_cols_[i]] = std::move(queued[i]);
    }

    for (int col : orders_cols_) RETURN_NOT_OK(kOrdersGenerators[col](thread_index));
    for (int col : lineitem_cols_) RETURN_NOT_OK(kLineitemGenerators[col](thread_index));

    if (!orders_cols_.empty()) {
      std::vector<Datum> orders_result(orders_cols_.size());
      for (size_t i = 0; i < orders_cols_.size(); i++) {
        int col_idx = orders_cols_[i];
        orders_result[i] = tld.orders[col_idx];
      }
      ARROW_ASSIGN_OR_RAISE(ExecBatch orders_batch,
                            ExecBatch::Make(std::move(orders_result)));
      {
        std::lock_guard<std::mutex> lock(orders_output_queue_mutex_);
        orders_output_queue_.emplace(std::move(orders_batch));
      }
    }
    std::vector<ExecBatch> lineitem_results;
    for (size_t ibatch = 0; ibatch < tld.lineitem.size(); ibatch++) {
      std::vector<Datum> lineitem_result(lineitem_cols_.size());
      for (size_t icol = 0; icol < lineitem_cols_.size(); icol++) {
        int col_idx = lineitem_cols_[icol];
        lineitem_result[icol] = tld.lineitem[ibatch][col_idx];
      }
      ARROW_ASSIGN_OR_RAISE(ExecBatch eb, ExecBatch::Make(std::move(lineitem_result)));
      lineitem_results.emplace_back(std::move(eb));
    }
    // Return the first batch, enqueue the rest.
    {
      std::lock_guard<std::mutex> lock(lineitem_output_queue_mutex_);
      for (size_t i = 1; i < lineitem_results.size(); i++)
        lineitem_output_queue_.emplace(std::move(lineitem_results[i]));
    }
    return std::move(lineitem_results[0]);
  }

 private:
#define FOR_EACH_ORDERS_COLUMN(F) \
  F(O_ORDERKEY)                   \
  F(O_CUSTKEY)                    \
  F(O_ORDERSTATUS)                \
  F(O_TOTALPRICE)                 \
  F(O_ORDERDATE)                  \
  F(O_ORDERPRIORITY)              \
  F(O_CLERK)                      \
  F(O_SHIPPRIORITY)               \
  F(O_COMMENT)

#define FOR_EACH_LINEITEM_COLUMN(F) \
  F(L_ORDERKEY)                     \
  F(L_PARTKEY)                      \
  F(L_SUPPKEY)                      \
  F(L_LINENUMBER)                   \
  F(L_QUANTITY)                     \
  F(L_EXTENDEDPRICE)                \
  F(L_DISCOUNT)                     \
  F(L_TAX)                          \
  F(L_RETURNFLAG)                   \
  F(L_LINESTATUS)                   \
  F(L_SHIPDATE)                     \
  F(L_COMMITDATE)                   \
  F(L_RECEIPTDATE)                  \
  F(L_SHIPINSTRUCT)                 \
  F(L_SHIPMODE)                     \
  F(L_COMMENT)

#define MAKE_ENUM(col) col,
  struct ORDERS {
    enum {
      FOR_EACH_ORDERS_COLUMN(MAKE_ENUM) kNumCols,
    };
  };
  struct LINEITEM {
    enum {
      FOR_EACH_LINEITEM_COLUMN(MAKE_ENUM) kNumCols,
    };
  };

#define MAKE_STRING_MAP(col) {#col, ORDERS::col},
  const std::unordered_map<std::string, int> kOrdersNameMap = {
      FOR_EACH_ORDERS_COLUMN(MAKE_STRING_MAP)};
#undef MAKE_STRING_MAP
#define MAKE_STRING_MAP(col) {#col, LINEITEM::col},
  const std::unordered_map<std::string, int> kLineitemNameMap = {
      FOR_EACH_LINEITEM_COLUMN(MAKE_STRING_MAP)};
#undef MAKE_STRING_MAP
#define MAKE_FN_ARRAY(col) \
  [this](size_t thread_index) { return this->col(thread_index); },
  const std::vector<GenerateColumnFn> kOrdersGenerators = {
      FOR_EACH_ORDERS_COLUMN(MAKE_FN_ARRAY)};
  const std::vector<GenerateColumnFn> kLineitemGenerators = {
      FOR_EACH_LINEITEM_COLUMN(MAKE_FN_ARRAY)};
#undef MAKE_FN_ARRAY
#undef FOR_EACH_LINEITEM_COLUMN
#undef FOR_EACH_ORDERS_COLUMN

  const std::vector<std::shared_ptr<DataType>> kOrdersTypes = {int32(),
                                                               int32(),
                                                               fixed_size_binary(1),
                                                               decimal128(12, 2),
                                                               date32(),
                                                               fixed_size_binary(15),
                                                               fixed_size_binary(15),
                                                               int32(),
                                                               utf8()};

  const std::vector<std::shared_ptr<DataType>> kLineitemTypes = {
      int32(),
      int32(),
      int32(),
      int32(),
      decimal128(12, 2),
      decimal128(12, 2),
      decimal128(12, 2),
      decimal128(12, 2),
      fixed_size_binary(1),
      fixed_size_binary(1),
      date32(),
      date32(),
      date32(),
      fixed_size_binary(25),
      fixed_size_binary(10),
      utf8(),
  };

  Status AllocateOrdersBatch(size_t thread_index, int column) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    ARROW_DCHECK(tld.orders[column].kind() == Datum::NONE);
    int32_t byte_width = kOrdersTypes[column]->byte_width();
    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
                          AllocateBuffer(tld.orders_to_generate * byte_width));
    ArrayData ad(kOrdersTypes[column], tld.orders_to_generate,
                 {nullptr, std::move(buff)});
    tld.orders[column] = std::move(ad);
    return Status::OK();
  }

  Status O_ORDERKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.orders[ORDERS::O_ORDERKEY].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_ORDERKEY));
      int32_t* o_orderkey = reinterpret_cast<int32_t*>(
          tld.orders[ORDERS::O_ORDERKEY].array()->buffers[1]->mutable_data());
      for (int64_t i = 0; i < tld.orders_to_generate; i++) {
        int32_t orderkey_index = static_cast<int32_t>(tld.orderkey_start + i);
        int32_t index_of_run = orderkey_index / 8;
        int32_t index_in_run = orderkey_index % 8;
        o_orderkey[i] = (index_of_run * 32 + index_in_run + 1);
        ARROW_DCHECK(1 <= o_orderkey[i] && o_orderkey[i] <= 4 * orders_rows_to_generate_);
      }
    }
    return Status::OK();
  }

  Status O_CUSTKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.orders[ORDERS::O_CUSTKEY].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_CUSTKEY));

      // Spec says it must be a random number between 1 and SF*150000 that is not
      // divisible by 3. Rather than repeatedly generating numbers until we get to
      // a non-divisible-by-3 number, we just generate a number between
      // 0 and SF * 50000 - 1, multiply by 3, and then add either 1 or 2.
      int32_t sf_50k = static_cast<int32_t>(scale_factor_ * 50000);
      std::uniform_int_distribution<int32_t> base_dist(0, sf_50k - 1);
      std::uniform_int_distribution<int32_t> offset_dist(1, 2);
      int32_t* o_custkey = reinterpret_cast<int32_t*>(
          tld.orders[ORDERS::O_CUSTKEY].array()->buffers[1]->mutable_data());
      for (int64_t i = 0; i < tld.orders_to_generate; i++)
        o_custkey[i] = 3 * base_dist(tld.rng) + offset_dist(tld.rng);
    }
    return Status::OK();
  }

  Status O_ORDERSTATUS(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.orders[ORDERS::O_ORDERSTATUS].kind() == Datum::NONE) {
      RETURN_NOT_OK(L_LINESTATUS(thread_index));
      RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_ORDERSTATUS));

      char* o_orderstatus = reinterpret_cast<char*>(
          tld.orders[ORDERS::O_ORDERSTATUS].array()->buffers[1]->mutable_data());

      size_t batch_offset = tld.first_batch_offset;
      size_t ibatch = 0;
      size_t iorder = 0;
      int32_t iline = 0;
      bool all_f = true;
      bool all_o = true;
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        const char* l_linestatus = reinterpret_cast<const char*>(
            tld.lineitem[ibatch][LINEITEM::L_LINESTATUS].array()->buffers[1]->data());

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        for (int64_t irun = 0; irun < next_run;) {
          for (; iline < tld.items_per_order[iorder] && irun < next_run;
               iline++, irun++, batch_offset++) {
            all_f &= l_linestatus[batch_offset] == 'F';
            all_o &= l_linestatus[batch_offset] == 'O';
          }
          if (iline == tld.items_per_order[iorder]) {
            iline = 0;
            ARROW_DCHECK(!(all_f && all_o));
            if (all_f)
              o_orderstatus[iorder] = 'F';
            else if (all_o)
              o_orderstatus[iorder] = 'O';
            else
              o_orderstatus[iorder] = 'P';
            iorder++;
          }
        }
        irow += next_run;
        batch_offset = 0;
      }
    }
    return Status::OK();
  }

  Status O_TOTALPRICE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.orders[ORDERS::O_TOTALPRICE].kind() == Datum::NONE) {
      RETURN_NOT_OK(L_EXTENDEDPRICE(thread_index));
      RETURN_NOT_OK(L_TAX(thread_index));
      RETURN_NOT_OK(L_DISCOUNT(thread_index));
      RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_TOTALPRICE));

      size_t batch_offset = tld.first_batch_offset;
      size_t ibatch = 0;
      size_t iorder = 0;
      int32_t iline = 0;
      int64_t sum = 0;
      Decimal128* o_totalprice = reinterpret_cast<Decimal128*>(
          tld.orders[ORDERS::O_TOTALPRICE].array()->buffers[1]->mutable_data());
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);

        const Decimal128* l_extendedprice = reinterpret_cast<const Decimal128*>(
            tld.lineitem[ibatch][LINEITEM::L_EXTENDEDPRICE].array()->buffers[1]->data());
        const Decimal128* l_tax = reinterpret_cast<const Decimal128*>(
            tld.lineitem[ibatch][LINEITEM::L_TAX].array()->buffers[1]->data());
        const Decimal128* l_discount = reinterpret_cast<const Decimal128*>(
            tld.lineitem[ibatch][LINEITEM::L_DISCOUNT].array()->buffers[1]->data());

        for (int64_t irun = 0; irun < next_run;) {
          for (; iline < tld.items_per_order[iorder] && irun < next_run;
               iline++, irun++, batch_offset++) {
            int64_t eprice = static_cast<int64_t>(l_extendedprice[batch_offset]);
            int64_t tax = static_cast<int64_t>(l_tax[batch_offset]);
            int64_t discount = static_cast<int64_t>(l_discount[batch_offset]);
            sum += (eprice * (100 + tax) * (100 - discount));
          }
          if (iline == tld.items_per_order[iorder]) {
            sum /= 100 * 100;
            o_totalprice[iorder] = {sum};
            iline = 0;
            iorder++;
          }
        }
        irow += next_run;
        batch_offset = 0;
      }
    }
    return Status::OK();
  }

  Status O_ORDERDATE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.orders[ORDERS::O_ORDERDATE].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_ORDERDATE));

      std::uniform_int_distribution<uint32_t> dist(kStartDate, kEndDate - 151);
      uint32_t* o_orderdate = reinterpret_cast<uint32_t*>(
          tld.orders[ORDERS::O_ORDERDATE].array()->buffers[1]->mutable_data());
      for (int64_t i = 0; i < tld.orders_to_generate; i++) o_orderdate[i] = dist(tld.rng);
    }
    return Status::OK();
  }

  Status O_ORDERPRIORITY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.orders[ORDERS::O_ORDERPRIORITY].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_ORDERPRIORITY));
      int32_t byte_width = kOrdersTypes[ORDERS::O_ORDERPRIORITY]->byte_width();
      std::uniform_int_distribution<int32_t> dist(0, kNumPriorities - 1);
      char* o_orderpriority = reinterpret_cast<char*>(
          tld.orders[ORDERS::O_ORDERPRIORITY].array()->buffers[1]->mutable_data());
      for (int64_t i = 0; i < tld.orders_to_generate; i++) {
        const char* str = Priorities[dist(tld.rng)];
        std::strncpy(o_orderpriority + i * byte_width, str, byte_width);
      }
    }
    return Status::OK();
  }

  Status O_CLERK(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.orders[ORDERS::O_CLERK].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_CLERK));
      int32_t byte_width = kOrdersTypes[ORDERS::O_CLERK]->byte_width();
      int64_t max_clerk_id = static_cast<int64_t>(scale_factor_ * 1000);
      std::uniform_int_distribution<int64_t> dist(1, max_clerk_id);
      char* o_clerk = reinterpret_cast<char*>(
          tld.orders[ORDERS::O_CLERK].array()->buffers[1]->mutable_data());
      for (int64_t i = 0; i < tld.orders_to_generate; i++) {
        const char* clerk = "Clerk#";
        const size_t clerk_length = std::strlen(clerk);
        int64_t clerk_number = dist(tld.rng);
        char* output = o_clerk + i * byte_width;
        std::strncpy(output, clerk, byte_width);
        AppendNumberPaddedToNineDigits(output + clerk_length, clerk_number);
      }
    }
    return Status::OK();
  }

  Status O_SHIPPRIORITY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.orders[ORDERS::O_SHIPPRIORITY].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_SHIPPRIORITY));
      int32_t* o_shippriority = reinterpret_cast<int32_t*>(
          tld.orders[ORDERS::O_SHIPPRIORITY].array()->buffers[1]->mutable_data());
      std::memset(o_shippriority, 0, tld.orders_to_generate * sizeof(int32_t));
    }
    return Status::OK();
  }

  Status O_COMMENT(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.orders[ORDERS::O_COMMENT].kind() == Datum::NONE) {
      ARROW_ASSIGN_OR_RAISE(
          tld.orders[ORDERS::O_COMMENT],
          g_text.GenerateComments(tld.orders_to_generate, 19, 78, tld.rng));
    }
    return Status::OK();
  }

  Status GenerateRowCounts(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    std::uniform_int_distribution<int> length_dist(1, 7);
    tld.lineitem_to_generate = 0;
    tld.items_per_order.clear();
    for (int64_t i = 0; i < tld.orders_to_generate; i++) {
      int length = length_dist(tld.rng);
      tld.items_per_order.push_back(length);
      tld.lineitem_to_generate += length;
    }
    int64_t num_batches =
        (tld.first_batch_offset + tld.lineitem_to_generate + batch_size_ - 1) /
        batch_size_;
    tld.lineitem.resize(num_batches);
    for (std::vector<Datum>& batch : tld.lineitem) {
      batch.resize(LINEITEM::kNumCols);
      std::fill(batch.begin(), batch.end(), Datum());
    }
    return Status::OK();
  }

  Status AllocateLineItemBufferIfNeeded(size_t thread_index, size_t ibatch, int column,
                                        size_t& out_batch_offset) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.lineitem[ibatch][column].kind() == Datum::NONE) {
      ARROW_DCHECK(ibatch != 0 || tld.first_batch_offset == 0);
      int32_t byte_width = kLineitemTypes[column]->byte_width();
      ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
                            AllocateResizableBuffer(batch_size_ * byte_width));
      ArrayData ad(kLineitemTypes[column], batch_size_, {nullptr, std::move(buff)});
      tld.lineitem[ibatch][column] = std::move(ad);
      out_batch_offset = 0;
    }
    if (ibatch == 0) out_batch_offset = tld.first_batch_offset;

    return Status::OK();
  }

  Status SetLineItemColumnSize(size_t thread_index, size_t ibatch, int column,
                               size_t new_size) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    int32_t byte_width = kLineitemTypes[column]->byte_width();
    tld.lineitem[ibatch][column].array()->length = static_cast<int64_t>(new_size);
    ResizableBuffer* buff = checked_cast<ResizableBuffer*>(
        tld.lineitem[ibatch][column].array()->buffers[1].get());
    return buff->Resize(static_cast<int64_t>(new_size * byte_width),
                        /*shrink_to_fit=*/false);
  }

  Status L_ORDERKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_ORDERKEY]) {
      tld.generated_lineitem[LINEITEM::L_ORDERKEY] = true;
      RETURN_NOT_OK(O_ORDERKEY(thread_index));
      const int32_t* o_orderkey = reinterpret_cast<const int32_t*>(
          tld.orders[ORDERS::O_ORDERKEY].array()->buffers[1]->data());

      size_t ibatch = 0;
      size_t iorder = 0;
      int32_t iline = 0;
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(thread_index, ibatch,
                                                     LINEITEM::L_ORDERKEY, batch_offset));
        int32_t* l_linenumber =
            reinterpret_cast<int32_t*>(tld.lineitem[ibatch][LINEITEM::L_ORDERKEY]
                                           .array()
                                           ->buffers[1]
                                           ->mutable_data());
        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        for (int64_t irun = 0; irun < next_run;) {
          for (; iline < tld.items_per_order[iorder] && irun < next_run; iline++, irun++)
            l_linenumber[batch_offset++] = o_orderkey[iorder];
          if (iline == tld.items_per_order[iorder]) {
            iline = 0;
            iorder++;
          }
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_ORDERKEY,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_PARTKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_PARTKEY]) {
      tld.generated_lineitem[LINEITEM::L_PARTKEY] = true;

      size_t ibatch = 0;
      int32_t max_partkey = static_cast<int32_t>(scale_factor_ * 200000);
      std::uniform_int_distribution<int32_t> dist(1, max_partkey);
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(thread_index, ibatch,
                                                     LINEITEM::L_PARTKEY, batch_offset));
        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        int32_t* l_partkey =
            reinterpret_cast<int32_t*>(tld.lineitem[ibatch][LINEITEM::L_PARTKEY]
                                           .array()
                                           ->buffers[1]
                                           ->mutable_data());
        for (int64_t i = 0; i < next_run; i++, batch_offset++)
          l_partkey[batch_offset] = dist(tld.rng);

        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_PARTKEY,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_SUPPKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_SUPPKEY]) {
      tld.generated_lineitem[LINEITEM::L_SUPPKEY] = true;
      RETURN_NOT_OK(L_PARTKEY(thread_index));

      size_t ibatch = 0;
      std::uniform_int_distribution<int32_t> dist(0, 3);
      const int32_t S = static_cast<int32_t>(scale_factor_ * 10000);
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset = 0;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(thread_index, ibatch,
                                                     LINEITEM::L_SUPPKEY, batch_offset));
        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        int32_t* l_suppkey =
            reinterpret_cast<int32_t*>(tld.lineitem[ibatch][LINEITEM::L_SUPPKEY]
                                           .array()
                                           ->buffers[1]
                                           ->mutable_data());
        const int32_t* l_partkey = reinterpret_cast<const int32_t*>(
            tld.lineitem[ibatch][LINEITEM::L_PARTKEY].array()->buffers[1]->data());
        for (int64_t i = 0; i < next_run; i++) {
          int32_t supplier = dist(tld.rng);
          int32_t partkey = l_partkey[batch_offset];
          // Fun fact: the parentheses for this expression are unbalanced in the TPC-H
          // spec.
          l_suppkey[batch_offset++] =
              (partkey + (supplier * ((S / 4) + (partkey - 1) / S))) % S + 1;
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_SUPPKEY,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_LINENUMBER(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_LINENUMBER]) {
      tld.generated_lineitem[LINEITEM::L_LINENUMBER] = true;
      size_t ibatch = 0;
      size_t iorder = 0;
      int32_t iline = 0;
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(
            thread_index, ibatch, LINEITEM::L_LINENUMBER, batch_offset));
        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        int32_t* l_linenumber =
            reinterpret_cast<int32_t*>(tld.lineitem[ibatch][LINEITEM::L_LINENUMBER]
                                           .array()
                                           ->buffers[1]
                                           ->mutable_data());
        for (int64_t irun = 0; irun < next_run;) {
          for (; iline < tld.items_per_order[iorder] && irun < next_run;
               iline++, irun++) {
            l_linenumber[batch_offset++] = (iline + 1);
            ARROW_DCHECK(1 <= (iline + 1) && (iline + 1) <= 7);
          }
          if (iline == tld.items_per_order[iorder]) {
            iline = 0;
            iorder++;
          }
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_LINENUMBER,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_QUANTITY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_QUANTITY]) {
      tld.generated_lineitem[LINEITEM::L_QUANTITY] = true;

      size_t ibatch = 0;
      std::uniform_int_distribution<int64_t> dist(1, 50);
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(thread_index, ibatch,
                                                     LINEITEM::L_QUANTITY, batch_offset));
        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        Decimal128* l_quantity =
            reinterpret_cast<Decimal128*>(tld.lineitem[ibatch][LINEITEM::L_QUANTITY]
                                              .array()
                                              ->buffers[1]
                                              ->mutable_data());
        for (int64_t i = 0; i < next_run; i++) {
          // Multiply by 100 because the type is decimal(12, 2), so the decimal goes after
          // two digits
          int64_t quantity = dist(tld.rng) * 100;
          l_quantity[batch_offset++] = {quantity};
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_QUANTITY,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_EXTENDEDPRICE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_EXTENDEDPRICE]) {
      tld.generated_lineitem[LINEITEM::L_EXTENDEDPRICE] = true;
      RETURN_NOT_OK(L_PARTKEY(thread_index));
      RETURN_NOT_OK(L_QUANTITY(thread_index));
      size_t ibatch = 0;
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(
            thread_index, ibatch, LINEITEM::L_EXTENDEDPRICE, batch_offset));

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        const int32_t* l_partkey = reinterpret_cast<const int32_t*>(
            tld.lineitem[ibatch][LINEITEM::L_PARTKEY].array()->buffers[1]->data());
        const Decimal128* l_quantity = reinterpret_cast<const Decimal128*>(
            tld.lineitem[ibatch][LINEITEM::L_QUANTITY].array()->buffers[1]->data());
        Decimal128* l_extendedprice =
            reinterpret_cast<Decimal128*>(tld.lineitem[ibatch][LINEITEM::L_EXTENDEDPRICE]
                                              .array()
                                              ->buffers[1]
                                              ->mutable_data());

        for (int64_t i = 0; i < next_run; i++, batch_offset++) {
          int64_t partkey = static_cast<int64_t>(l_partkey[batch_offset]);
          // Divide by 100 to recover the integer representation (not Decimal).
          int64_t quantity = static_cast<int64_t>(l_quantity[batch_offset]) / 100;

          // Spec says to divide by 100, but that happens automatically due to this being
          // stored to two decimal points.
          int64_t retail_price =
              (90000 + ((partkey / 10) % 20001) + 100 * (partkey % 1000));
          int64_t extended_price = retail_price * quantity;
          l_extendedprice[batch_offset] = {extended_price};
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch,
                                            LINEITEM::L_EXTENDEDPRICE, batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_DISCOUNT(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_DISCOUNT]) {
      tld.generated_lineitem[LINEITEM::L_DISCOUNT] = true;
      size_t ibatch = 0;
      std::uniform_int_distribution<int32_t> dist(0, 10);
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(thread_index, ibatch,
                                                     LINEITEM::L_DISCOUNT, batch_offset));

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        Decimal128* l_discount =
            reinterpret_cast<Decimal128*>(tld.lineitem[ibatch][LINEITEM::L_DISCOUNT]
                                              .array()
                                              ->buffers[1]
                                              ->mutable_data());

        for (int64_t i = 0; i < next_run; i++, batch_offset++)
          l_discount[batch_offset] = {dist(tld.rng)};
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_DISCOUNT,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_TAX(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_TAX]) {
      tld.generated_lineitem[LINEITEM::L_TAX] = true;
      size_t ibatch = 0;
      std::uniform_int_distribution<int32_t> dist(0, 8);
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(thread_index, ibatch,
                                                     LINEITEM::L_TAX, batch_offset));
        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        Decimal128* l_tax = reinterpret_cast<Decimal128*>(
            tld.lineitem[ibatch][LINEITEM::L_TAX].array()->buffers[1]->mutable_data());

        for (int64_t i = 0; i < next_run; i++, batch_offset++)
          l_tax[batch_offset] = {dist(tld.rng)};
        irow += next_run;
        RETURN_NOT_OK(
            SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_TAX, batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_RETURNFLAG(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_RETURNFLAG]) {
      tld.generated_lineitem[LINEITEM::L_RETURNFLAG] = true;
      RETURN_NOT_OK(L_RECEIPTDATE(thread_index));
      size_t ibatch = 0;
      std::uniform_int_distribution<uint32_t> dist;
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(
            thread_index, ibatch, LINEITEM::L_RETURNFLAG, batch_offset));

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        char* l_returnflag =
            reinterpret_cast<char*>(tld.lineitem[ibatch][LINEITEM::L_RETURNFLAG]
                                        .array()
                                        ->buffers[1]
                                        ->mutable_data());
        const uint32_t* l_receiptdate = reinterpret_cast<const uint32_t*>(
            tld.lineitem[ibatch][LINEITEM::L_RECEIPTDATE]
                .array()
                ->buffers[1]
                ->mutable_data());

        for (int64_t i = 0; i < next_run; i++, batch_offset++) {
          if (l_receiptdate[batch_offset] <= kCurrentDate) {
            uint32_t r = dist(tld.rng);
            l_returnflag[batch_offset] = (r % 2 == 1) ? 'R' : 'A';
          } else {
            l_returnflag[batch_offset] = 'N';
          }
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_RETURNFLAG,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_LINESTATUS(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_LINESTATUS]) {
      tld.generated_lineitem[LINEITEM::L_LINESTATUS] = true;
      RETURN_NOT_OK(L_SHIPDATE(thread_index));
      size_t ibatch = 0;
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(
            thread_index, ibatch, LINEITEM::L_LINESTATUS, batch_offset));

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        char* l_linestatus =
            reinterpret_cast<char*>(tld.lineitem[ibatch][LINEITEM::L_LINESTATUS]
                                        .array()
                                        ->buffers[1]
                                        ->mutable_data());
        const uint32_t* l_shipdate =
            reinterpret_cast<const uint32_t*>(tld.lineitem[ibatch][LINEITEM::L_SHIPDATE]
                                                  .array()
                                                  ->buffers[1]
                                                  ->mutable_data());

        for (int64_t i = 0; i < next_run; i++, batch_offset++) {
          if (l_shipdate[batch_offset] > kCurrentDate)
            l_linestatus[batch_offset] = 'O';
          else
            l_linestatus[batch_offset] = 'F';
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_LINESTATUS,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_SHIPDATE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_SHIPDATE]) {
      tld.generated_lineitem[LINEITEM::L_SHIPDATE] = true;
      RETURN_NOT_OK(O_ORDERDATE(thread_index));
      const int32_t* o_orderdate = reinterpret_cast<const int32_t*>(
          tld.orders[ORDERS::O_ORDERDATE].array()->buffers[1]->data());
      std::uniform_int_distribution<uint32_t> dist(1, 121);
      size_t ibatch = 0;
      size_t iorder = 0;
      int32_t iline = 0;
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(thread_index, ibatch,
                                                     LINEITEM::L_SHIPDATE, batch_offset));
        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        uint32_t* l_shipdate =
            reinterpret_cast<uint32_t*>(tld.lineitem[ibatch][LINEITEM::L_SHIPDATE]
                                            .array()
                                            ->buffers[1]
                                            ->mutable_data());
        for (int64_t irun = 0; irun < next_run;) {
          for (; iline < tld.items_per_order[iorder] && irun < next_run; iline++, irun++)
            l_shipdate[batch_offset++] = o_orderdate[iorder] + dist(tld.rng);
          if (iline == tld.items_per_order[iorder]) {
            iline = 0;
            iorder++;
          }
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_SHIPDATE,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_COMMITDATE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_COMMITDATE]) {
      tld.generated_lineitem[LINEITEM::L_COMMITDATE] = true;
      const int32_t* o_orderdate = reinterpret_cast<const int32_t*>(
          tld.orders[ORDERS::O_ORDERDATE].array()->buffers[1]->data());
      std::uniform_int_distribution<uint32_t> dist(30, 90);
      size_t ibatch = 0;
      size_t iorder = 0;
      int32_t iline = 0;
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(
            thread_index, ibatch, LINEITEM::L_COMMITDATE, batch_offset));

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        uint32_t* l_commitdate =
            reinterpret_cast<uint32_t*>(tld.lineitem[ibatch][LINEITEM::L_COMMITDATE]
                                            .array()
                                            ->buffers[1]
                                            ->mutable_data());
        for (int64_t irun = 0; irun < next_run;) {
          for (; iline < tld.items_per_order[iorder] && irun < next_run; iline++, irun++)
            l_commitdate[batch_offset++] = o_orderdate[iorder] + dist(tld.rng);
          if (iline == tld.items_per_order[iorder]) {
            iline = 0;
            iorder++;
          }
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_COMMITDATE,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_RECEIPTDATE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_RECEIPTDATE]) {
      tld.generated_lineitem[LINEITEM::L_RECEIPTDATE] = true;
      RETURN_NOT_OK(L_SHIPDATE(thread_index));
      size_t ibatch = 0;
      std::uniform_int_distribution<int32_t> dist(1, 30);
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(
            thread_index, ibatch, LINEITEM::L_RECEIPTDATE, batch_offset));

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        uint32_t* l_receiptdate =
            reinterpret_cast<uint32_t*>(tld.lineitem[ibatch][LINEITEM::L_RECEIPTDATE]
                                            .array()
                                            ->buffers[1]
                                            ->mutable_data());
        const uint32_t* l_shipdate =
            reinterpret_cast<const uint32_t*>(tld.lineitem[ibatch][LINEITEM::L_SHIPDATE]
                                                  .array()
                                                  ->buffers[1]
                                                  ->mutable_data());

        for (int64_t i = 0; i < next_run; i++, batch_offset++)
          l_receiptdate[batch_offset] = l_shipdate[batch_offset] + dist(tld.rng);

        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_RECEIPTDATE,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_SHIPINSTRUCT(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_SHIPINSTRUCT]) {
      tld.generated_lineitem[LINEITEM::L_SHIPINSTRUCT] = true;
      int32_t byte_width = kLineitemTypes[LINEITEM::L_SHIPINSTRUCT]->byte_width();
      size_t ibatch = 0;
      std::uniform_int_distribution<size_t> dist(0, kNumInstructions - 1);
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(
            thread_index, ibatch, LINEITEM::L_SHIPINSTRUCT, batch_offset));

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        char* l_shipinstruct =
            reinterpret_cast<char*>(tld.lineitem[ibatch][LINEITEM::L_SHIPINSTRUCT]
                                        .array()
                                        ->buffers[1]
                                        ->mutable_data());

        for (int64_t i = 0; i < next_run; i++, batch_offset++) {
          const char* str = Instructions[dist(tld.rng)];
          // Note that we don't have to memset the buffer to 0 because strncpy pads each
          // string with 0's anyway
          std::strncpy(l_shipinstruct + batch_offset * byte_width, str, byte_width);
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch,
                                            LINEITEM::L_SHIPINSTRUCT, batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_SHIPMODE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_SHIPMODE]) {
      tld.generated_lineitem[LINEITEM::L_SHIPMODE] = true;
      int32_t byte_width = kLineitemTypes[LINEITEM::L_SHIPMODE]->byte_width();
      size_t ibatch = 0;
      std::uniform_int_distribution<size_t> dist(0, kNumModes - 1);
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        size_t batch_offset;
        RETURN_NOT_OK(AllocateLineItemBufferIfNeeded(thread_index, ibatch,
                                                     LINEITEM::L_SHIPMODE, batch_offset));

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
        char* l_shipmode =
            reinterpret_cast<char*>(tld.lineitem[ibatch][LINEITEM::L_SHIPMODE]
                                        .array()
                                        ->buffers[1]
                                        ->mutable_data());

        for (int64_t i = 0; i < next_run; i++, batch_offset++) {
          const char* str = Modes[dist(tld.rng)];
          std::strncpy(l_shipmode + batch_offset * byte_width, str, byte_width);
        }
        irow += next_run;
        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_SHIPMODE,
                                            batch_offset));
      }
    }
    return Status::OK();
  }

  Status L_COMMENT(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (!tld.generated_lineitem[LINEITEM::L_COMMENT]) {
      tld.generated_lineitem[LINEITEM::L_COMMENT] = true;

      size_t ibatch = 0;
      for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
        // Comments are kind of sneaky: we always generate the full batch and then just
        // bump the length
        size_t batch_offset = 0;
        if (tld.lineitem[ibatch][LINEITEM::L_COMMENT].kind() == Datum::NONE) {
          ARROW_ASSIGN_OR_RAISE(tld.lineitem[ibatch][LINEITEM::L_COMMENT],
                                g_text.GenerateComments(batch_size_, 10, 43, tld.rng));
          batch_offset = 0;
        }
        if (irow == 0) batch_offset = tld.first_batch_offset;

        int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
        int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);

        batch_offset += next_run;
        irow += next_run;
        tld.lineitem[ibatch][LINEITEM::L_COMMENT].array()->length = batch_offset;
      }
    }
    return Status::OK();
  }

  struct ThreadLocalData {
    std::vector<Datum> orders;
    int64_t orders_to_generate;
    int64_t orderkey_start;

    std::vector<std::vector<Datum>> lineitem;
    std::vector<int> items_per_order;
    int64_t lineitem_to_generate;
    int64_t first_batch_offset;
    std::bitset<LINEITEM::kNumCols> generated_lineitem;
    random::pcg32_fast rng;
  };
  std::vector<ThreadLocalData> thread_local_data_;

  bool inited_ = false;
  std::mutex orders_output_queue_mutex_;
  std::mutex lineitem_output_queue_mutex_;
  std::queue<ExecBatch> orders_output_queue_;
  std::queue<ExecBatch> lineitem_output_queue_;
  int64_t batch_size_;
  double scale_factor_;
  int64_t orders_rows_to_generate_;
  int64_t orders_rows_generated_;
  std::vector<int> orders_cols_;
  std::vector<int> lineitem_cols_;

  std::atomic<size_t> orders_batches_generated_ = {0};
  std::atomic<size_t> lineitem_batches_generated_ = {0};
};

class SupplierGenerator : public TpchTableGenerator {
 public:
  Status Init(std::vector<std::string> columns, double scale_factor, int64_t batch_size,
              int64_t seed) override {
    scale_factor_ = scale_factor;
    batch_size_ = batch_size;
    rows_to_generate_ = static_cast<int64_t>(scale_factor_ * 10000);
    rows_generated_.store(0);
    ARROW_ASSIGN_OR_RAISE(schema_,
                          SetOutputColumns(columns, kTypes, kNameMap, gen_list_));

    random::pcg64_fast seed_rng(seed);
    seed_ = kSeedDist(seed_rng);

    random::pcg32_fast rng(kSeedDist(seed_rng));
    std::uniform_int_distribution<int64_t> dist(0, rows_to_generate_ - 1);
    size_t num_special_rows = static_cast<size_t>(5 * scale_factor_);
    std::unordered_set<int64_t> good_rows_set;
    while (good_rows_set.size() < num_special_rows) {
      int64_t row = dist(rng);
      good_rows_set.insert(row);
    }
    std::unordered_set<int64_t> bad_rows_set;
    while (bad_rows_set.size() < num_special_rows) {
      int64_t bad_row;
      do {
        bad_row = dist(rng);
      } while (good_rows_set.find(bad_row) != good_rows_set.end());
      bad_rows_set.insert(bad_row);
    }
    good_rows_.clear();
    bad_rows_.clear();
    good_rows_.insert(good_rows_.end(), good_rows_set.begin(), good_rows_set.end());
    bad_rows_.insert(bad_rows_.end(), bad_rows_set.begin(), bad_rows_set.end());
    std::sort(good_rows_.begin(), good_rows_.end());
    std::sort(bad_rows_.begin(), bad_rows_.end());
    return Status::OK();
  }

  Status StartProducing(size_t num_threads, OutputBatchCallback output_callback,
                        FinishedCallback finished_callback,
                        ScheduleCallback schedule_callback) override {
    thread_local_data_.resize(num_threads);
    for (ThreadLocalData& tld : thread_local_data_) tld.rng.seed(GetRandomSeed());

    output_callback_ = std::move(output_callback);
    finished_callback_ = std::move(finished_callback);
    schedule_callback_ = std::move(schedule_callback);
    for (size_t i = 0; i < num_threads; i++)
      RETURN_NOT_OK(schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); }));
    return Status::OK();
  }

  std::shared_ptr<Schema> schema() const override { return schema_; }

 private:
#define FOR_EACH_COLUMN(F) \
  F(S_SUPPKEY)             \
  F(S_NAME)                \
  F(S_ADDRESS)             \
  F(S_NATIONKEY)           \
  F(S_PHONE)               \
  F(S_ACCTBAL)             \
  F(S_COMMENT)

#define MAKE_ENUM(col) col,
  struct SUPPLIER {
    enum {
      FOR_EACH_COLUMN(MAKE_ENUM) kNumCols,
    };
  };
#undef MAKE_ENUM
#define MAKE_STRING_MAP(col) {#col, SUPPLIER::col},
  const std::unordered_map<std::string, int> kNameMap = {
      FOR_EACH_COLUMN(MAKE_STRING_MAP)};
#undef MAKE_STRING_MAP
#define MAKE_FN_ARRAY(col) \
  [this](size_t thread_index) { return this->col(thread_index); },
  std::vector<GenerateColumnFn> kGenerators = {FOR_EACH_COLUMN(MAKE_FN_ARRAY)};
#undef MAKE_FN_ARRAY
#undef FOR_EACH_COLUMN

  std::vector<std::shared_ptr<DataType>> kTypes = {
      int32(), fixed_size_binary(25), utf8(),
      int32(), fixed_size_binary(15), decimal128(12, 2),
      utf8(),
  };

  Status ProduceCallback(size_t thread_index) {
    if (done_.load()) return Status::OK();
    ThreadLocalData& tld = thread_local_data_[thread_index];
    tld.suppkey_start = rows_generated_.fetch_add(batch_size_);
    if (tld.suppkey_start >= rows_to_generate_) return Status::OK();

    tld.to_generate = std::min(batch_size_, rows_to_generate_ - tld.suppkey_start);

    tld.batch.resize(SUPPLIER::kNumCols);
    std::fill(tld.batch.begin(), tld.batch.end(), Datum());
    for (int col : gen_list_) RETURN_NOT_OK(kGenerators[col](thread_index));

    std::vector<Datum> result(gen_list_.size());
    for (size_t i = 0; i < gen_list_.size(); i++) {
      int col_idx = gen_list_[i];
      result[i] = tld.batch[col_idx];
    }
    ARROW_ASSIGN_OR_RAISE(ExecBatch eb, ExecBatch::Make(std::move(result)));
    int64_t batches_to_generate = (rows_to_generate_ + batch_size_ - 1) / batch_size_;
    int64_t batches_outputted_before_this_one = batches_outputted_.fetch_add(1);
    bool is_last_batch = batches_outputted_before_this_one == (batches_to_generate - 1);
    ARROW_RETURN_NOT_OK(output_callback_(std::move(eb)));
    if (is_last_batch) {
      bool expected = false;
      if (done_.compare_exchange_strong(expected, true))
        ARROW_RETURN_NOT_OK(finished_callback_(batches_outputted_.load()));
      return Status::OK();
    }
    return schedule_callback_(
        [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
  }

  Status AllocateColumn(size_t thread_index, int column) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    ARROW_DCHECK(tld.batch[column].kind() == Datum::NONE);
    int32_t byte_width = kTypes[column]->byte_width();
    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
                          AllocateBuffer(tld.to_generate * byte_width));
    ArrayData ad(kTypes[column], tld.to_generate, {nullptr, std::move(buff)});
    tld.batch[column] = std::move(ad);
    return Status::OK();
  }

  Status S_SUPPKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[SUPPLIER::S_SUPPKEY].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateColumn(thread_index, SUPPLIER::S_SUPPKEY));
      int32_t* s_suppkey = reinterpret_cast<int32_t*>(
          tld.batch[SUPPLIER::S_SUPPKEY].array()->buffers[1]->mutable_data());
      for (int64_t irow = 0; irow < tld.to_generate; irow++) {
        s_suppkey[irow] = static_cast<int32_t>(tld.suppkey_start + irow + 1);
      }
    }
    return Status::OK();
  }

  Status S_NAME(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[SUPPLIER::S_NAME].kind() == Datum::NONE) {
      RETURN_NOT_OK(S_SUPPKEY(thread_index));
      const int32_t* s_suppkey = reinterpret_cast<const int32_t*>(
          tld.batch[SUPPLIER::S_SUPPKEY].array()->buffers[1]->data());
      RETURN_NOT_OK(AllocateColumn(thread_index, SUPPLIER::S_NAME));
      int32_t byte_width = kTypes[SUPPLIER::S_NAME]->byte_width();
      char* s_name = reinterpret_cast<char*>(
          tld.batch[SUPPLIER::S_NAME].array()->buffers[1]->mutable_data());
      // Look man, I'm just following the spec ok? Section 4.2.3 as of March 1 2022
      const char* supplier = "Supplie#r";
      const size_t supplier_length = std::strlen(supplier);
      for (int64_t irow = 0; irow < tld.to_generate; irow++) {
        char* out = s_name + byte_width * irow;
        std::strncpy(out, supplier, byte_width);
        AppendNumberPaddedToNineDigits(out + supplier_length, s_suppkey[irow]);
      }
    }
    return Status::OK();
  }

  Status S_ADDRESS(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[SUPPLIER::S_ADDRESS].kind() == Datum::NONE) {
      ARROW_ASSIGN_OR_RAISE(tld.batch[SUPPLIER::S_ADDRESS],
                            RandomVString(tld.rng, tld.to_generate, 10, 40));
    }
    return Status::OK();
  }

  Status S_NATIONKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[SUPPLIER::S_NATIONKEY].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateColumn(thread_index, SUPPLIER::S_NATIONKEY));
      std::uniform_int_distribution<int32_t> dist(0, 24);
      int32_t* s_nationkey = reinterpret_cast<int32_t*>(
          tld.batch[SUPPLIER::S_NATIONKEY].array()->buffers[1]->mutable_data());
      for (int64_t irow = 0; irow < tld.to_generate; irow++)
        s_nationkey[irow] = dist(tld.rng);
    }
    return Status::OK();
  }

  Status S_PHONE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[SUPPLIER::S_PHONE].kind() == Datum::NONE) {
      RETURN_NOT_OK(S_NATIONKEY(thread_index));
      RETURN_NOT_OK(AllocateColumn(thread_index, SUPPLIER::S_PHONE));
      int32_t byte_width = kTypes[SUPPLIER::S_PHONE]->byte_width();
      const int32_t* s_nationkey = reinterpret_cast<const int32_t*>(
          tld.batch[SUPPLIER::S_NATIONKEY].array()->buffers[1]->data());
      char* s_phone = reinterpret_cast<char*>(
          tld.batch[SUPPLIER::S_PHONE].array()->buffers[1]->mutable_data());
      for (int64_t irow = 0; irow < tld.to_generate; irow++) {
        GeneratePhoneNumber(s_phone + irow * byte_width, tld.rng, s_nationkey[irow]);
      }
    }
    return Status::OK();
  }

  Status S_ACCTBAL(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[SUPPLIER::S_ACCTBAL].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateColumn(thread_index, SUPPLIER::S_ACCTBAL));
      Decimal128* s_acctbal = reinterpret_cast<Decimal128*>(
          tld.batch[SUPPLIER::S_ACCTBAL].array()->buffers[1]->mutable_data());
      std::uniform_int_distribution<int64_t> dist(-99999, 999999);
      for (int64_t irow = 0; irow < tld.to_generate; irow++)
        s_acctbal[irow] = {dist(tld.rng)};
    }
    return Status::OK();
  }

  Status S_COMMENT(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[SUPPLIER::S_COMMENT].kind() == Datum::NONE) {
      ARROW_ASSIGN_OR_RAISE(tld.batch[SUPPLIER::S_COMMENT],
                            g_text.GenerateComments(tld.to_generate, 25, 100, tld.rng));
      ModifyComments(thread_index, "Recommends", good_rows_);
      ModifyComments(thread_index, "Complaints", bad_rows_);
    }
    return Status::OK();
  }

  void ModifyComments(size_t thread_index, const char* review,
                      const std::vector<int64_t>& indices) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    const int32_t* offsets = reinterpret_cast<const int32_t*>(
        tld.batch[SUPPLIER::S_COMMENT].array()->buffers[1]->data());
    char* str = reinterpret_cast<char*>(
        tld.batch[SUPPLIER::S_COMMENT].array()->buffers[2]->mutable_data());
    const char* customer = "Customer";
    const int32_t customer_length = static_cast<int32_t>(std::strlen(customer));
    const int32_t review_length = static_cast<int32_t>(std::strlen(review));

    auto it = std::lower_bound(indices.begin(), indices.end(), tld.suppkey_start);
    for (; it != indices.end() && *it < tld.suppkey_start + tld.to_generate; it++) {
      int64_t idx_in_batch = *it - tld.suppkey_start;
      char* out = str + offsets[idx_in_batch];
      int32_t str_length = offsets[idx_in_batch + 1] - offsets[idx_in_batch];
      std::uniform_int_distribution<int32_t> gap_dist(
          0, str_length - customer_length - review_length);
      int32_t gap = gap_dist(tld.rng);
      int32_t total_length = customer_length + gap + review_length;
      std::uniform_int_distribution<int32_t> start_dist(0, str_length - total_length);
      int32_t start = start_dist(tld.rng);
      std::memcpy(out + start, customer, customer_length);
      std::memcpy(out + start + customer_length + gap, review, review_length);
    }
  }

  struct ThreadLocalData {
    random::pcg32_fast rng;
    int64_t suppkey_start;
    int64_t to_generate;
    std::vector<Datum> batch;
  };
  std::vector<ThreadLocalData> thread_local_data_;
  std::vector<int64_t> good_rows_;
  std::vector<int64_t> bad_rows_;

  OutputBatchCallback output_callback_;
  FinishedCallback finished_callback_;
  ScheduleCallback schedule_callback_;
  int64_t rows_to_generate_;
  std::atomic<int64_t> rows_generated_;
  double scale_factor_;
  int64_t batch_size_;
  std::vector<int> gen_list_;
  std::shared_ptr<Schema> schema_;
};

class PartGenerator : public TpchTableGenerator {
 public:
  explicit PartGenerator(std::shared_ptr<PartAndPartSupplierGenerator> gen)
      : gen_(std::move(gen)) {}

  Status Init(std::vector<std::string> columns, double scale_factor, int64_t batch_size,
              int64_t seed) override {
    seed_ = seed;
    scale_factor_ = scale_factor;
    batch_size_ = batch_size;
    ARROW_ASSIGN_OR_RAISE(schema_, gen_->SetPartOutputColumns(columns));
    return Status::OK();
  }

  Status StartProducing(size_t num_threads, OutputBatchCallback output_callback,
                        FinishedCallback finished_callback,
                        ScheduleCallback schedule_callback) override {
    RETURN_NOT_OK(gen_->Init(num_threads, batch_size_, scale_factor_, seed_));
    output_callback_ = std::move(output_callback);
    finished_callback_ = std::move(finished_callback);
    schedule_callback_ = std::move(schedule_callback);

    for (size_t i = 0; i < num_threads; i++)
      RETURN_NOT_OK(schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); }));
    return Status::OK();
  }

  std::shared_ptr<Schema> schema() const override { return schema_; }

 private:
  Status ProduceCallback(size_t thread_index) {
    if (done_.load()) return Status::OK();
    ARROW_ASSIGN_OR_RAISE(std::optional<ExecBatch> maybe_batch,
                          gen_->NextPartBatch(thread_index));
    if (!maybe_batch.has_value()) {
      int64_t batches_generated = gen_->part_batches_generated();
      if (batches_generated == batches_outputted_.load()) {
        bool expected = false;
        if (done_.compare_exchange_strong(expected, true))
          ARROW_RETURN_NOT_OK(finished_callback_(batches_outputted_.load()));
        return Status::OK();
      }
      return schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
    }
    ExecBatch batch = std::move(*maybe_batch);
    ARROW_RETURN_NOT_OK(output_callback_(std::move(batch)));
    batches_outputted_++;
    return schedule_callback_(
        [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
  }

  OutputBatchCallback output_callback_;
  FinishedCallback finished_callback_;
  ScheduleCallback schedule_callback_;
  int64_t batch_size_;
  double scale_factor_;
  std::shared_ptr<PartAndPartSupplierGenerator> gen_;
  std::shared_ptr<Schema> schema_;
};

class PartSuppGenerator : public TpchTableGenerator {
 public:
  explicit PartSuppGenerator(std::shared_ptr<PartAndPartSupplierGenerator> gen)
      : gen_(std::move(gen)) {}

  Status Init(std::vector<std::string> columns, double scale_factor, int64_t batch_size,
              int64_t seed) override {
    seed_ = seed;
    scale_factor_ = scale_factor;
    batch_size_ = batch_size;
    ARROW_ASSIGN_OR_RAISE(schema_, gen_->SetPartSuppOutputColumns(columns));
    return Status::OK();
  }

  Status StartProducing(size_t num_threads, OutputBatchCallback output_callback,
                        FinishedCallback finished_callback,
                        ScheduleCallback schedule_callback) override {
    RETURN_NOT_OK(gen_->Init(num_threads, batch_size_, scale_factor_, seed_));
    output_callback_ = std::move(output_callback);
    finished_callback_ = std::move(finished_callback);
    schedule_callback_ = std::move(schedule_callback);

    for (size_t i = 0; i < num_threads; i++)
      RETURN_NOT_OK(schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); }));
    return Status::OK();
  }

  std::shared_ptr<Schema> schema() const override { return schema_; }

 private:
  Status ProduceCallback(size_t thread_index) {
    if (done_.load()) return Status::OK();
    ARROW_ASSIGN_OR_RAISE(std::optional<ExecBatch> maybe_batch,
                          gen_->NextPartSuppBatch(thread_index));
    if (!maybe_batch.has_value()) {
      int64_t batches_generated = gen_->partsupp_batches_generated();
      if (batches_generated == batches_outputted_.load()) {
        bool expected = false;
        if (done_.compare_exchange_strong(expected, true))
          ARROW_RETURN_NOT_OK(finished_callback_(batches_outputted_.load()));
        return Status::OK();
      }
      return schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
    }
    ExecBatch batch = std::move(*maybe_batch);
    ARROW_RETURN_NOT_OK(output_callback_(std::move(batch)));
    batches_outputted_++;
    return schedule_callback_(
        [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
  }

  OutputBatchCallback output_callback_;
  FinishedCallback finished_callback_;
  ScheduleCallback schedule_callback_;
  int64_t batch_size_;
  double scale_factor_;
  std::shared_ptr<PartAndPartSupplierGenerator> gen_;
  std::shared_ptr<Schema> schema_;
};

class CustomerGenerator : public TpchTableGenerator {
 public:
  Status Init(std::vector<std::string> columns, double scale_factor, int64_t batch_size,
              int64_t seed) override {
    seed_ = seed;
    scale_factor_ = scale_factor;
    batch_size_ = batch_size;
    rows_to_generate_ = static_cast<int64_t>(scale_factor_ * 150000);
    rows_generated_.store(0);
    ARROW_ASSIGN_OR_RAISE(schema_,
                          SetOutputColumns(columns, kTypes, kNameMap, gen_list_));
    return Status::OK();
  }

  Status StartProducing(size_t num_threads, OutputBatchCallback output_callback,
                        FinishedCallback finished_callback,
                        ScheduleCallback schedule_callback) override {
    thread_local_data_.resize(num_threads);
    random::pcg64_fast seed_rng(seed_);
    for (ThreadLocalData& tld : thread_local_data_) tld.rng.seed(kSeedDist(seed_rng));

    output_callback_ = std::move(output_callback);
    finished_callback_ = std::move(finished_callback);
    schedule_callback_ = std::move(schedule_callback);
    for (size_t i = 0; i < num_threads; i++)
      RETURN_NOT_OK(schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); }));
    return Status::OK();
  }

  std::shared_ptr<Schema> schema() const override { return schema_; }

 private:
#define FOR_EACH_COLUMN(F) \
  F(C_CUSTKEY)             \
  F(C_NAME)                \
  F(C_ADDRESS)             \
  F(C_NATIONKEY)           \
  F(C_PHONE)               \
  F(C_ACCTBAL)             \
  F(C_MKTSEGMENT)          \
  F(C_COMMENT)

#define MAKE_ENUM(col) col,
  struct CUSTOMER {
    enum {
      FOR_EACH_COLUMN(MAKE_ENUM) kNumCols,
    };
  };
#undef MAKE_ENUM
#define MAKE_STRING_MAP(col) {#col, CUSTOMER::col},
  const std::unordered_map<std::string, int> kNameMap = {
      FOR_EACH_COLUMN(MAKE_STRING_MAP)};
#undef MAKE_STRING_MAP
#define MAKE_FN_ARRAY(col) \
  [this](size_t thread_index) { return this->col(thread_index); },
  std::vector<GenerateColumnFn> kGenerators = {FOR_EACH_COLUMN(MAKE_FN_ARRAY)};
#undef MAKE_FN_ARRAY
#undef FOR_EACH_COLUMN

  std::vector<std::shared_ptr<DataType>> kTypes = {
      int32(),
      utf8(),
      utf8(),
      int32(),
      fixed_size_binary(15),
      decimal128(12, 2),
      fixed_size_binary(10),
      utf8(),
  };

  Status ProduceCallback(size_t thread_index) {
    if (done_.load()) return Status::OK();
    ThreadLocalData& tld = thread_local_data_[thread_index];
    tld.custkey_start = rows_generated_.fetch_add(batch_size_);
    if (tld.custkey_start >= rows_to_generate_) return Status::OK();

    tld.to_generate = std::min(batch_size_, rows_to_generate_ - tld.custkey_start);

    tld.batch.resize(CUSTOMER::kNumCols);
    std::fill(tld.batch.begin(), tld.batch.end(), Datum());
    for (int col : gen_list_) RETURN_NOT_OK(kGenerators[col](thread_index));

    std::vector<Datum> result(gen_list_.size());
    for (size_t i = 0; i < gen_list_.size(); i++) {
      int col_idx = gen_list_[i];
      result[i] = tld.batch[col_idx];
    }
    ARROW_ASSIGN_OR_RAISE(ExecBatch eb, ExecBatch::Make(std::move(result)));
    int64_t batches_to_generate = (rows_to_generate_ + batch_size_ - 1) / batch_size_;
    int64_t batches_generated_before_this_one = batches_outputted_.fetch_add(1);
    bool is_last_batch = batches_generated_before_this_one == (batches_to_generate - 1);
    ARROW_RETURN_NOT_OK(output_callback_(std::move(eb)));
    if (is_last_batch) {
      bool expected = false;
      if (done_.compare_exchange_strong(expected, true))
        ARROW_RETURN_NOT_OK(finished_callback_(batches_outputted_.load()));
      return Status::OK();
    }
    return schedule_callback_(
        [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
  }

  Status AllocateColumn(size_t thread_index, int column) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    ARROW_DCHECK(tld.batch[column].kind() == Datum::NONE);
    int32_t byte_width = kTypes[column]->byte_width();
    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
                          AllocateBuffer(tld.to_generate * byte_width));
    ArrayData ad(kTypes[column], tld.to_generate, {nullptr, std::move(buff)});
    tld.batch[column] = std::move(ad);
    return Status::OK();
  }

  Status C_CUSTKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[CUSTOMER::C_CUSTKEY].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateColumn(thread_index, CUSTOMER::C_CUSTKEY));
      int32_t* c_custkey = reinterpret_cast<int32_t*>(
          tld.batch[CUSTOMER::C_CUSTKEY].array()->buffers[1]->mutable_data());
      for (int64_t irow = 0; irow < tld.to_generate; irow++) {
        c_custkey[irow] = static_cast<int32_t>(tld.custkey_start + irow + 1);
      }
    }
    return Status::OK();
  }

  Status C_NAME(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[CUSTOMER::C_NAME].kind() == Datum::NONE) {
      RETURN_NOT_OK(C_CUSTKEY(thread_index));
      const int32_t* c_custkey = reinterpret_cast<const int32_t*>(
          tld.batch[CUSTOMER::C_CUSTKEY].array()->buffers[1]->data());
      ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> offset_buff,
                            AllocateBuffer((tld.to_generate + 1) * sizeof(int32_t)));
      int32_t* offsets = reinterpret_cast<int32_t*>(offset_buff->mutable_data());
      const char* customer = "Customer#";
      const size_t customer_length = std::strlen(customer);
      offsets[0] = 0;
      for (int64_t irow = 0; irow < tld.to_generate; irow++) {
        int num_digits = GetNumDigits(c_custkey[irow]);
        int num_chars = std::max(num_digits, 9);
        offsets[irow + 1] =
            static_cast<int32_t>(offsets[irow] + num_chars + customer_length);
      }
      ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> str_buff,
                            AllocateBuffer(offsets[tld.to_generate]));
      char* str = reinterpret_cast<char*>(str_buff->mutable_data());
      for (int64_t irow = 0; irow < tld.to_generate; irow++) {
        char* out = str + offsets[irow];
        std::memcpy(out, customer, customer_length);
        AppendNumberPaddedToNineDigits(out + customer_length, c_custkey[irow]);
      }
      ArrayData ad(utf8(), tld.to_generate,
                   {nullptr, std::move(offset_buff), std::move(str_buff)});
      tld.batch[CUSTOMER::C_NAME] = std::move(ad);
    }
    return Status::OK();
  }

  Status C_ADDRESS(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[CUSTOMER::C_ADDRESS].kind() == Datum::NONE) {
      ARROW_ASSIGN_OR_RAISE(tld.batch[CUSTOMER::C_ADDRESS],
                            RandomVString(tld.rng, tld.to_generate, 10, 40));
    }
    return Status::OK();
  }

  Status C_NATIONKEY(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[CUSTOMER::C_NATIONKEY].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateColumn(thread_index, CUSTOMER::C_NATIONKEY));
      std::uniform_int_distribution<int32_t> dist(0, 24);
      int32_t* c_nationkey = reinterpret_cast<int32_t*>(
          tld.batch[CUSTOMER::C_NATIONKEY].array()->buffers[1]->mutable_data());
      for (int64_t irow = 0; irow < tld.to_generate; irow++)
        c_nationkey[irow] = dist(tld.rng);
    }
    return Status::OK();
  }

  Status C_PHONE(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[CUSTOMER::C_PHONE].kind() == Datum::NONE) {
      RETURN_NOT_OK(C_NATIONKEY(thread_index));
      RETURN_NOT_OK(AllocateColumn(thread_index, CUSTOMER::C_PHONE));
      int32_t byte_width = kTypes[CUSTOMER::C_PHONE]->byte_width();
      const int32_t* c_nationkey = reinterpret_cast<const int32_t*>(
          tld.batch[CUSTOMER::C_NATIONKEY].array()->buffers[1]->data());
      char* c_phone = reinterpret_cast<char*>(
          tld.batch[CUSTOMER::C_PHONE].array()->buffers[1]->mutable_data());
      for (int64_t irow = 0; irow < tld.to_generate; irow++) {
        GeneratePhoneNumber(c_phone + irow * byte_width, tld.rng, c_nationkey[irow]);
      }
    }
    return Status::OK();
  }

  Status C_ACCTBAL(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[CUSTOMER::C_ACCTBAL].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateColumn(thread_index, CUSTOMER::C_ACCTBAL));
      Decimal128* c_acctbal = reinterpret_cast<Decimal128*>(
          tld.batch[CUSTOMER::C_ACCTBAL].array()->buffers[1]->mutable_data());
      std::uniform_int_distribution<int64_t> dist(-99999, 999999);
      for (int64_t irow = 0; irow < tld.to_generate; irow++)
        c_acctbal[irow] = {dist(tld.rng)};
    }
    return Status::OK();
  }

  Status C_MKTSEGMENT(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[CUSTOMER::C_MKTSEGMENT].kind() == Datum::NONE) {
      RETURN_NOT_OK(AllocateColumn(thread_index, CUSTOMER::C_MKTSEGMENT));
      int32_t byte_width = kTypes[CUSTOMER::C_MKTSEGMENT]->byte_width();
      char* c_mktsegment = reinterpret_cast<char*>(
          tld.batch[CUSTOMER::C_MKTSEGMENT].array()->buffers[1]->mutable_data());
      std::uniform_int_distribution<int32_t> dist(0, kNumSegments - 1);
      for (int64_t irow = 0; irow < tld.to_generate; irow++) {
        char* out = c_mktsegment + irow * byte_width;
        int str_idx = dist(tld.rng);
        std::strncpy(out, Segments[str_idx], byte_width);
      }
    }
    return Status::OK();
  }

  Status C_COMMENT(size_t thread_index) {
    ThreadLocalData& tld = thread_local_data_[thread_index];
    if (tld.batch[CUSTOMER::C_COMMENT].kind() == Datum::NONE) {
      ARROW_ASSIGN_OR_RAISE(tld.batch[CUSTOMER::C_COMMENT],
                            g_text.GenerateComments(tld.to_generate, 29, 116, tld.rng));
    }
    return Status::OK();
  }

  struct ThreadLocalData {
    random::pcg32_fast rng;
    int64_t custkey_start;
    int64_t to_generate;
    std::vector<Datum> batch;
  };
  std::vector<ThreadLocalData> thread_local_data_;

  OutputBatchCallback output_callback_;
  FinishedCallback finished_callback_;
  ScheduleCallback schedule_callback_;
  int64_t rows_to_generate_{0};
  std::atomic<int64_t> rows_generated_ = {0};
  double scale_factor_{0};
  int64_t batch_size_{0};
  std::vector<int> gen_list_;
  std::shared_ptr<Schema> schema_;
};

class OrdersGenerator : public TpchTableGenerator {
 public:
  explicit OrdersGenerator(std::shared_ptr<OrdersAndLineItemGenerator> gen)
      : gen_(std::move(gen)) {}

  Status Init(std::vector<std::string> columns, double scale_factor, int64_t batch_size,
              int64_t seed) override {
    seed_ = seed;
    scale_factor_ = scale_factor;
    batch_size_ = batch_size;
    ARROW_ASSIGN_OR_RAISE(schema_, gen_->SetOrdersOutputColumns(columns));
    return Status::OK();
  }

  Status StartProducing(size_t num_threads, OutputBatchCallback output_callback,
                        FinishedCallback finished_callback,
                        ScheduleCallback schedule_callback) override {
    RETURN_NOT_OK(gen_->Init(num_threads, batch_size_, scale_factor_, seed_));
    output_callback_ = std::move(output_callback);
    finished_callback_ = std::move(finished_callback);
    schedule_callback_ = std::move(schedule_callback);

    for (size_t i = 0; i < num_threads; i++)
      RETURN_NOT_OK(schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); }));
    return Status::OK();
  }

  std::shared_ptr<Schema> schema() const override { return schema_; }

 private:
  Status ProduceCallback(size_t thread_index) {
    if (done_.load()) return Status::OK();
    ARROW_ASSIGN_OR_RAISE(std::optional<ExecBatch> maybe_batch,
                          gen_->NextOrdersBatch(thread_index));
    if (!maybe_batch.has_value()) {
      int64_t batches_generated = gen_->orders_batches_generated();
      if (batches_generated == batches_outputted_.load()) {
        bool expected = false;
        if (done_.compare_exchange_strong(expected, true))
          ARROW_RETURN_NOT_OK(finished_callback_(batches_outputted_.load()));
        return Status::OK();
      }
      return schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
    }
    ExecBatch batch = std::move(*maybe_batch);
    ARROW_RETURN_NOT_OK(output_callback_(std::move(batch)));
    batches_outputted_++;
    return schedule_callback_(
        [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
  }

  OutputBatchCallback output_callback_;
  FinishedCallback finished_callback_;
  ScheduleCallback schedule_callback_;
  int64_t batch_size_;
  double scale_factor_;
  std::shared_ptr<OrdersAndLineItemGenerator> gen_;
  std::shared_ptr<Schema> schema_;
};

class LineitemGenerator : public TpchTableGenerator {
 public:
  explicit LineitemGenerator(std::shared_ptr<OrdersAndLineItemGenerator> gen)
      : gen_(std::move(gen)) {}

  Status Init(std::vector<std::string> columns, double scale_factor, int64_t batch_size,
              int64_t seed) override {
    seed_ = seed;
    scale_factor_ = scale_factor;
    batch_size_ = batch_size;
    ARROW_ASSIGN_OR_RAISE(schema_, gen_->SetLineItemOutputColumns(columns));
    return Status::OK();
  }

  Status StartProducing(size_t num_threads, OutputBatchCallback output_callback,
                        FinishedCallback finished_callback,
                        ScheduleCallback schedule_callback) override {
    RETURN_NOT_OK(gen_->Init(num_threads, batch_size_, scale_factor_, seed_));
    output_callback_ = std::move(output_callback);
    finished_callback_ = std::move(finished_callback);
    schedule_callback_ = std::move(schedule_callback);

    for (size_t i = 0; i < num_threads; i++)
      RETURN_NOT_OK(schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); }));
    return Status::OK();
  }

  std::shared_ptr<Schema> schema() const override { return schema_; }

 private:
  Status ProduceCallback(size_t thread_index) {
    if (done_.load()) return Status::OK();
    ARROW_ASSIGN_OR_RAISE(std::optional<ExecBatch> maybe_batch,
                          gen_->NextLineItemBatch(thread_index));
    if (!maybe_batch.has_value()) {
      int64_t batches_generated = gen_->lineitem_batches_generated();
      if (batches_generated == batches_outputted_.load()) {
        bool expected = false;
        if (done_.compare_exchange_strong(expected, true))
          ARROW_RETURN_NOT_OK(finished_callback_(batches_outputted_.load()));
        return Status::OK();
      }
      // We may have generated but not outputted all of the batches.
      return schedule_callback_(
          [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
    }
    ExecBatch batch = std::move(*maybe_batch);
    ARROW_RETURN_NOT_OK(output_callback_(std::move(batch)));
    batches_outputted_++;
    return schedule_callback_(
        [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
  }

  OutputBatchCallback output_callback_;
  FinishedCallback finished_callback_;
  ScheduleCallback schedule_callback_;
  int64_t batch_size_;
  double scale_factor_;
  std::shared_ptr<OrdersAndLineItemGenerator> gen_;
  std::shared_ptr<Schema> schema_;
};

class NationGenerator : public TpchTableGenerator {
 public:
  Status Init(std::vector<std::string> columns, double /*scale_factor*/,
              int64_t /*batch_size*/, int64_t seed) override {
    ARROW_ASSIGN_OR_RAISE(schema_,
                          SetOutputColumns(columns, kTypes, kNameMap, column_indices_));
    seed_ = seed;
    rng_.seed(seed_);
    return Status::OK();
  }

  Status StartProducing(size_t /*num_threads*/, OutputBatchCallback output_callback,
                        FinishedCallback finished_callback,
                        ScheduleCallback /*schedule_task_callback*/) override {
    std::shared_ptr<Buffer> N_NATIONKEY_buffer = Buffer::Wrap(kNationKey, kRowCount);
    ArrayData N_NATIONKEY_arraydata(int32(), kRowCount,
                                    {nullptr, std::move(N_NATIONKEY_buffer)});

    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> N_NAME_buffer,
                          AllocateBuffer(kRowCount * kNameByteWidth));
    char* N_NAME = reinterpret_cast<char*>(N_NAME_buffer->mutable_data());
    for (size_t i = 0; i < kRowCount; i++)
      std::strncpy(N_NAME + kNameByteWidth * i, kCountryNames[i], kNameByteWidth);
    ArrayData N_NAME_arraydata(fixed_size_binary(kNameByteWidth), kRowCount,
                               {nullptr, std::move(N_NAME_buffer)});

    std::shared_ptr<Buffer> N_REGIONKEY_buffer = Buffer::Wrap(kRegionKey, kRowCount);
    ArrayData N_REGIONKEY_arraydata(int32(), kRowCount,
                                    {nullptr, std::move(N_REGIONKEY_buffer)});

    ARROW_ASSIGN_OR_RAISE(Datum N_COMMENT_datum,
                          g_text.GenerateComments(kRowCount, 31, 114, rng_));

    std::vector<Datum> fields = {
        std::move(N_NATIONKEY_arraydata), std::move(N_NAME_arraydata),
        std::move(N_REGIONKEY_arraydata), std::move(N_COMMENT_datum)};

    std::vector<Datum> result;
    for (const int& col : column_indices_) result.push_back(fields[col]);
    ARROW_ASSIGN_OR_RAISE(ExecBatch batch, ExecBatch::Make(std::move(result)));
    ARROW_RETURN_NOT_OK(output_callback(std::move(batch)));
    return finished_callback(static_cast<int64_t>(1));
  }

  std::shared_ptr<Schema> schema() const override { return schema_; }

 private:
  random::pcg32_fast rng_;

  static constexpr size_t kRowCount = 25;
  static constexpr int32_t kNameByteWidth = 25;
  static constexpr int32_t kNationKey[kRowCount] = {0,  1,  2,  3,  4,  5,  6,  7,  8,
                                                    9,  10, 11, 12, 13, 14, 15, 16, 17,
                                                    18, 19, 20, 21, 22, 23, 24};
  static constexpr const char* kCountryNames[kRowCount] = {
      "ALGERIA",      "ARGENTINA",  "BRAZIL",  "CANADA",         "EGYPT",
      "ETHIOPIA",     "FRANCE",     "GERMANY", "INDIA",          "INDONESIA",
      "IRAN",         "IRAQ",       "JAPAN",   "JORDAN",         "KENYA",
      "MOROCCO",      "MOZAMBIQUE", "PERU",    "CHINA",          "ROMANIA",
      "SAUDI ARABIA", "VIETNAM",    "RUSSIA",  "UNITED KINGDOM", "UNITED STATES"};
  static constexpr int32_t kRegionKey[kRowCount] = {0, 1, 1, 1, 4, 0, 3, 3, 2, 2, 4, 4, 2,
                                                    4, 0, 0, 0, 1, 2, 3, 4, 2, 3, 3, 1};

  struct NATION {
    enum {
      N_NATIONKEY,
      N_NAME,
      N_REGIONKEY,
      N_COMMENT,
    };
  };

  const std::unordered_map<std::string, int> kNameMap = {
      {"N_NATIONKEY", NATION::N_NATIONKEY},
      {"N_NAME", NATION::N_NAME},
      {"N_REGIONKEY", NATION::N_REGIONKEY},
      {"N_COMMENT", NATION::N_COMMENT},
  };

  std::vector<std::shared_ptr<DataType>> kTypes = {
      int32(),
      fixed_size_binary(kNameByteWidth),
      int32(),
      utf8(),
  };

  std::shared_ptr<Schema> schema_;
  std::vector<int> column_indices_;
};

constexpr int32_t NationGenerator::kNationKey[NationGenerator::kRowCount];
constexpr const char* NationGenerator::kCountryNames[NationGenerator::kRowCount];
constexpr int32_t NationGenerator::kRegionKey[NationGenerator::kRowCount];

class RegionGenerator : public TpchTableGenerator {
 public:
  Status Init(std::vector<std::string> columns, double /*scale_factor*/,
              int64_t /*batch_size*/, int64_t seed) override {
    ARROW_ASSIGN_OR_RAISE(schema_,
                          SetOutputColumns(columns, kTypes, kNameMap, column_indices_));
    seed_ = seed;
    rng_.seed(seed_);
    return Status::OK();
  }

  Status StartProducing(size_t num_threads, OutputBatchCallback output_callback,
                        FinishedCallback finished_callback,
                        ScheduleCallback /*schedule_task_callback*/) override {
    std::shared_ptr<Buffer> R_REGIONKEY_buffer = Buffer::Wrap(kRegionKey, kRowCount);
    ArrayData R_REGIONKEY_arraydata(int32(), kRowCount,
                                    {nullptr, std::move(R_REGIONKEY_buffer)});

    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> R_NAME_buffer,
                          AllocateBuffer(kRowCount * kNameByteWidth));
    char* R_NAME_data = reinterpret_cast<char*>(R_NAME_buffer->mutable_data());
    for (size_t i = 0; i < kRowCount; i++)
      std::strncpy(R_NAME_data + kNameByteWidth * i, kRegionNames[i], kNameByteWidth);
    ArrayData R_NAME_arraydata(kTypes[static_cast<int>(REGION::R_NAME)], kRowCount,
                               {nullptr, std::move(R_NAME_buffer)});

    ARROW_ASSIGN_OR_RAISE(Datum R_COMMENT_datum,
                          g_text.GenerateComments(kRowCount, 31, 115, rng_));

    std::vector<Datum> fields = {std::move(R_REGIONKEY_arraydata),
                                 std::move(R_NAME_arraydata), std::move(R_COMMENT_datum)};
    std::vector<Datum> result;
    for (const int& col : column_indices_) result.push_back(fields[col]);
    ARROW_ASSIGN_OR_RAISE(ExecBatch batch, ExecBatch::Make(std::move(result)));
    ARROW_RETURN_NOT_OK(output_callback(std::move(batch)));
    ARROW_RETURN_NOT_OK(finished_callback(static_cast<int64_t>(1)));
    return Status::OK();
  }

  std::shared_ptr<Schema> schema() const override { return schema_; }

  random::pcg32_fast rng_;

  static constexpr size_t kRowCount = 5;
  static constexpr int32_t kNameByteWidth = 25;
  static constexpr int32_t kRegionKey[kRowCount] = {0, 1, 2, 3, 4};
  static constexpr const char* kRegionNames[kRowCount] = {"AFRICA", "AMERICA", "ASIA",
                                                          "EUROPE", "MIDDLE EAST"};

  struct REGION {
    enum {
      R_REGIONKEY,
      R_NAME,
      R_COMMENT,
      kNumColumns,
    };
  };

  const std::unordered_map<std::string, int> kNameMap = {
      {"R_REGIONKEY", REGION::R_REGIONKEY},
      {"R_NAME", REGION::R_NAME},
      {"R_COMMENT", REGION::R_COMMENT},
  };

  const std::vector<std::shared_ptr<DataType>> kTypes = {
      int32(),
      fixed_size_binary(kNameByteWidth),
      utf8(),
  };

  std::shared_ptr<Schema> schema_;
  std::vector<int> column_indices_;
};

constexpr int32_t RegionGenerator::kRegionKey[RegionGenerator::kRowCount];
constexpr const char* RegionGenerator::kRegionNames[RegionGenerator::kRowCount];

class TpchNode : public ExecNode {
 public:
  TpchNode(ExecPlan* plan, const char* name,
           std::unique_ptr<TpchTableGenerator> generator)
      : ExecNode(plan, {}, {}, generator->schema()),
        name_(name),
        generator_(std::move(generator)) {}

  const char* kind_name() const override { return name_; }

  [[noreturn]] static void NoInputs() {
    Unreachable("TPC-H node should never have any inputs");
  }

  [[noreturn]] Status InputReceived(ExecNode*, ExecBatch) override { NoInputs(); }

  [[noreturn]] Status InputFinished(ExecNode*, int) override { NoInputs(); }

  Status StartProducing() override {
    RETURN_NOT_OK(generator_->StartProducing(
        plan_->query_context()->max_concurrency(),
        [this](ExecBatch batch) { return this->OutputBatchCallback(std::move(batch)); },
        [this](int64_t num_batches) { return this->FinishedCallback(num_batches); },
        [this](std::function<Status(size_t)> func) -> Status {
          return this->ScheduleTaskCallback(std::move(func));
        }));
    return Status::OK();
  }

  void PauseProducing(ExecNode* output, int32_t counter) override {
    // TODO(ARROW-16087)
  }
  void ResumeProducing(ExecNode* output, int32_t counter) override {
    // TODO(ARROW-16087)
  }

  Status StopProducingImpl() override {
    generator_->Abort();
    return Status::OK();
  }

 private:
  Status OutputBatchCallback(ExecBatch batch) {
    return output_->InputReceived(this, std::move(batch));
  }

  Status FinishedCallback(int64_t total_num_batches) {
    ARROW_RETURN_NOT_OK(
        output_->InputFinished(this, static_cast<int>(total_num_batches)));
    finished_generating_.store(true);
    return Status::OK();
  }

  Status ScheduleTaskCallback(std::function<Status(size_t)> func) {
    if (finished_generating_.load()) return Status::OK();
    plan_->query_context()->ScheduleTask(
        [this, func](size_t thread_index) {
          Status status = func(thread_index);
          if (!status.ok()) {
            ARROW_RETURN_NOT_OK(StopProducing());
          }
          return status;
        },
        "TpchNode::GenerateAndProcessBatch");
    return Status::OK();
  }

  const char* name_;
  std::unique_ptr<TpchTableGenerator> generator_;
  std::atomic<bool> finished_generating_{false};
};

class TpchGenImpl : public TpchGen {
 public:
  Result<ExecNode*> Supplier(std::vector<std::string> columns = {}) override;
  Result<ExecNode*> Part(std::vector<std::string> columns = {}) override;
  Result<ExecNode*> PartSupp(std::vector<std::string> columns = {}) override;
  Result<ExecNode*> Customer(std::vector<std::string> columns = {}) override;
  Result<ExecNode*> Orders(std::vector<std::string> columns = {}) override;
  Result<ExecNode*> Lineitem(std::vector<std::string> columns = {}) override;
  Result<ExecNode*> Nation(std::vector<std::string> columns = {}) override;
  Result<ExecNode*> Region(std::vector<std::string> columns = {}) override;

  TpchGenImpl(ExecPlan* plan, double scale_factor, int64_t batch_size, int64_t seed)
      : plan_(plan),
        scale_factor_(scale_factor),
        batch_size_(batch_size),
        seed_rng_(seed) {}

  template <typename Generator>
  Result<ExecNode*> CreateNode(const char* name, std::vector<std::string> columns);

  ExecPlan* plan_;
  double scale_factor_;
  int64_t batch_size_;
  random::pcg64_fast seed_rng_;

  std::shared_ptr<PartAndPartSupplierGenerator> part_and_part_supp_generator_{};
  std::shared_ptr<OrdersAndLineItemGenerator> orders_and_line_item_generator_{};
};

template <typename Generator>
Result<ExecNode*> TpchGenImpl::CreateNode(const char* name,
                                          std::vector<std::string> columns) {
  std::unique_ptr<Generator> generator = std::make_unique<Generator>();
  RETURN_NOT_OK(generator->Init(std::move(columns), scale_factor_, batch_size_,
                                kSeedDist(seed_rng_)));
  return plan_->EmplaceNode<TpchNode>(plan_, name, std::move(generator));
}

Result<ExecNode*> TpchGenImpl::Supplier(std::vector<std::string> columns) {
  return CreateNode<SupplierGenerator>("Supplier", std::move(columns));
}

Result<ExecNode*> TpchGenImpl::Part(std::vector<std::string> columns) {
  if (!part_and_part_supp_generator_) {
    part_and_part_supp_generator_ = std::make_shared<PartAndPartSupplierGenerator>();
  }
  std::unique_ptr<PartGenerator> generator =
      std::make_unique<PartGenerator>(part_and_part_supp_generator_);
  RETURN_NOT_OK(generator->Init(std::move(columns), scale_factor_, batch_size_,
                                kSeedDist(seed_rng_)));
  return plan_->EmplaceNode<TpchNode>(plan_, "Part", std::move(generator));
}

Result<ExecNode*> TpchGenImpl::PartSupp(std::vector<std::string> columns) {
  if (!part_and_part_supp_generator_) {
    part_and_part_supp_generator_ = std::make_shared<PartAndPartSupplierGenerator>();
  }
  std::unique_ptr<PartSuppGenerator> generator =
      std::make_unique<PartSuppGenerator>(part_and_part_supp_generator_);
  RETURN_NOT_OK(generator->Init(std::move(columns), scale_factor_, batch_size_,
                                kSeedDist(seed_rng_)));
  return plan_->EmplaceNode<TpchNode>(plan_, "PartSupp", std::move(generator));
}

Result<ExecNode*> TpchGenImpl::Customer(std::vector<std::string> columns) {
  return CreateNode<CustomerGenerator>("Customer", std::move(columns));
}

Result<ExecNode*> TpchGenImpl::Orders(std::vector<std::string> columns) {
  if (!orders_and_line_item_generator_) {
    orders_and_line_item_generator_ = std::make_shared<OrdersAndLineItemGenerator>();
  }
  std::unique_ptr<OrdersGenerator> generator =
      std::make_unique<OrdersGenerator>(orders_and_line_item_generator_);
  RETURN_NOT_OK(generator->Init(std::move(columns), scale_factor_, batch_size_,
                                kSeedDist(seed_rng_)));
  return plan_->EmplaceNode<TpchNode>(plan_, "Orders", std::move(generator));
}

Result<ExecNode*> TpchGenImpl::Lineitem(std::vector<std::string> columns) {
  if (!orders_and_line_item_generator_) {
    orders_and_line_item_generator_ = std::make_shared<OrdersAndLineItemGenerator>();
  }
  std::unique_ptr<LineitemGenerator> generator =
      std::make_unique<LineitemGenerator>(orders_and_line_item_generator_);
  RETURN_NOT_OK(generator->Init(std::move(columns), scale_factor_, batch_size_,
                                kSeedDist(seed_rng_)));
  return plan_->EmplaceNode<TpchNode>(plan_, "Lineitem", std::move(generator));
}

Result<ExecNode*> TpchGenImpl::Nation(std::vector<std::string> columns) {
  return CreateNode<NationGenerator>("Nation", std::move(columns));
}

Result<ExecNode*> TpchGenImpl::Region(std::vector<std::string> columns) {
  return CreateNode<RegionGenerator>("Region", std::move(columns));
}

}  // namespace

Result<std::unique_ptr<TpchGen>> TpchGen::Make(ExecPlan* plan, double scale_factor,
                                               int64_t batch_size,
                                               std::optional<int64_t> seed) {
  if (!seed.has_value()) seed = GetRandomSeed();
  return std::make_unique<TpchGenImpl>(plan, scale_factor, batch_size, *seed);
}

}  // namespace internal
}  // namespace acero
}  // namespace arrow
